在Java中如何使用Phaser实现线程同步

Phaser是Java中支持动态注册和注销的多阶段线程同步工具,适用于参与者数量不固定的并发场景。通过register()添加参与者,arriveAndAwaitAdvance()实现阶段同步,arriveAndDeregister()使线程完成任务后退出,可用于控制线程生命周期。示例展示三个线程分两阶段同步,以及主线程在第一阶段后动态添加新线程并最终注销自身,体现其灵活的阶段性协调能力。

Phaser 是 Java 并发包 java.util.concurrent 中提供的一种灵活的线程同步工具,它比 CountDownLatch 和 CyclicBarrier 更加通用。Phaser 支持动态注册和注销参与线程,适用于多阶段任务的同步场景。下面介绍如何在 Java 中使用 Phaser 实现线程同步。

Phaser 的基本概念

Phaser 可以看作是可重用的屏障(barrier),多个线程在某个阶段到达时进行同步。当所有参与者都到达当前阶段后,Phaser 会触发推进到下一阶段。与 CyclicBarrier 不同的是,Phaser 允许线程在运行过程中动态加入或退出,并支持分阶段协调。

关键方法说明:

  • arriveAndAwaitAdvance():当前线程到达屏障点并等待其他线程到达。
  • arriveAndDeregister():当前线程到达并从 Phaser 中注销,不再参与后续阶段。
  • register():动态注册一个新参与者。
  • bulkRegister(n):批量注册 n 个参与者。
  • getPhase():获取当前阶段编号(从0开始)。

简单示例:多阶段任务同步

假设有三个线程需要完成两个阶段的任务,每个阶段都需要所有线程完成后再进入下一阶段。

import java.util.concurrent.Phaser;

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 初始化3个参与者

        for (int i = 1; i <= 3; i++) {
            new Thread(new Worker(phaser, "Thread-" + i)).start();
        }
    }

    static class Worker implements Runnable {
        private final Phaser phaser;
        private final String name;

        Worker(Phaser phaser, String name) {
            this.phaser = phaser;
            this.name = name;
        }

        @Override
        public void run() {
            // 第一阶段
            System.out.println(name + " 到达第一阶段");
            phaser.arriveAndAwaitAdvance();

            // 模拟工作
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            // 第二阶段
            System.out.println(name + " 到达第二阶段");
            phaser.arriveAndAwaitAdvance();

            // 所有阶段完成后,线程结束
            System.out.println(name + " 完成任务");
        }
    }
}

动态注册与分阶段控制

Phaser 的优势之一是支持动态添加线程。例如,在某个阶段后启动新的任务线程。

public class DynamicPhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1); // 主线程作为初始参与者

        // 启动前3个线程
        for (int i = 0; i < 3; i++) {
            phaser.register(); // 注册一个参与者
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 开始第一阶段");
                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName() + " 进入第二阶段");
                phaser.arriveAndAwaitAdvance();
            }, "Worker-" + i).start();
        }

        // 主线程等待第一阶段完成
        phaser.arriveAndAwaitAdvance();

        // 在第一阶段结束后,动态添加新线程
        System.out.println("主线程准备添加新任务...");
        phaser.register();
        new Thread(() -> {
            System.out.println("NewWorker 开始执行");
            phaser.arriveAndAwaitAdvance();
        }, "NewWorker").start();

        // 最后等待所有线程完成第二阶段
        phaser.arriveAndDeregister(); // 主线程注销
    }
}

使用 arriveAndDeregister() 控制生命周期

某些线程可能只参与部分阶段。通过 arriveAndDeregister() 可以让线程在完成任务后退出同步流程。

// 示例:某个线程只参与第一阶段
phaser.arriveAndDeregister(); // 到达并退出,不再等待后续阶段

这种机制适合用于“启动器”线程或初始化任务,它们只需确保准备工作完成即可退出。

基本上就这些。Phaser 适合用于需要多阶段协调、线程数量不固定或生命周期不同的并发场景。合理使用 register、arriveAndAwaitAdvance 和 arriveAndDeregister 能有效控制线程同步行为。