Java中Phaser控制多阶段任务方法

Phaser支持动态注册与多阶段同步,适用于线程数可变的分阶段任务,通过arriveAndAwaitAdvance实现阶段等待,register/bulkRegister添加参与者,getPhase获取当前阶段,示例中3个线程协同完成初始化、数据处理与汇总三阶段任务。

在Java并发编程中,Phaser 是一个灵活的同步工具,适用于协调多个线程执行多阶段任务。与 CountDownLatch 或 CyclicBarrier 不同,Phaser 支持动态注册和分阶段同步,特别适合需要分步完成且参与线程数量可能变化的场景。

Phaser的基本概念

Phaser 可以看作是 CyclicBarrier 和 CountDownLatch 的结合体,但它更强大:

  • 支持多个阶段(phase)的同步
  • 线程可以动态地注册(register)或注销(arriveAndDeregister)
  • 每个阶段结束后自动进入下一阶段
  • 可重用,无需显式重置

核心方法说明

以下是Phaser常用的关键方法:

  • arriveAndAwaitAdvance():当前线程到达当前阶段,等待其他参与者完成该阶段后再一起进入下一阶段
  • arriveAndDeregister():到达并注销,表示该线程不再参与后续阶段
  • register():动态注册一个新参与者
  • bulkRegister(n):批量注册n个参与者
  • getPhase():获取当前阶段编号(从0开始)

多阶段任务示例

假设我们有三个阶段的任务:初始化、处理数据、汇总结果。多个工作线程需协同完成每一步。

import java.util.concurrent.Phaser;

public class MultiStageTask {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 初始3个参与者

        for (int i = 1; i <= 3; i++) {
            new Thread(new Worker(i, phaser), "Worker-" + i).start();
        }
    }

    static class Worker implements Runnable {
        private final int id;
        private final Phaser phaser;

        Worker(int id, Phaser phaser) {
            this.id = id;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            // 阶段1:初始化
            System.out.println("Thread " + id + " 初始化完成");
            phaser.arriveAndAwaitAdvance();

            // 阶段2:处理数据
            System.out.println("Thread " + id + " 正在处理数据");
            simulateWork(500);
            phaser.arriveAndAwaitAdvance();

            // 阶段3:汇总前准备
            System.out.println("Thread " + id + " 准备汇总");
            phaser.arriveAndAwaitAdvance();

            // 所有阶段完成后退出
            System.out.println("Thread " + id + " 完成所有任务");
        }

        private void simulateWork(long ms) {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

输出会显示每个线程按阶段同步推进,所有线程都完成某一阶段后才集体进入下一阶段。

动态参与者管理

Phaser允许运行时添加或移除参与者。例如某个线程只参与前两个阶段:

  • 使用 phaser.register() 在任意时刻增加参与者
  • 调用 arriveAndDeregister() 让线程完成当前阶段后退出同步

比如在某线程最后阶段前注销:

// 某线程只参与前两阶段
phaser.arriveAndDeregister(); // 完成后不再等待后续阶段

此时其余线程仍可继续后续阶段,不受影响。

基本上就这些。Phaser 提供了对多阶段任务精细的控制能力,合理使用能有效简化复杂并发流程的设计。