Phaser并发阶段器Phaser由JDK1.7提出,是一个复杂强大的同步辅助类,是对同步工具类CountDownLatch和CyclicBarrier的综合升级,能够支持分阶段实现等待的业务场景。但是,这个错误可能会导致{@codeIllegalStateException}仅在一些后续操作这个相位器,如果有的话。Phaser解决分科考试问题从体验的示例中其实没看出其优势在哪里,上诉场景完全可以采用CountDownLatch,所以现在换一种场景来说明Phaser的优势。

Phaser由JDK1.7提出,是一个复杂强大的同步辅助类,是对同步工具类CountDownLatch和CyclicBarrier的综合升级,能够支持分阶段实现等待的业务场景。
我们可以回忆下CountDownLatch讲的是先指定N个线程,在N个线程干完活之前,其它线程都需要等待(导游等待旅游团所有人上车才能开车),而CyclicBarrier讲的是先指定N个线程。等N个线程到齐了大家同时干活(多个驴友相约去旅游,先到的需要等待后来的),而Phaser是两者的结合,可以理解为先指定N个线程,等N个线程到齐后开始干第一阶段的活,等第一阶段所有的线程都干完活了,接着N个线程开始干第二阶段的活,直到所有的阶段完成工作,程序结束,当然需要注意的是每个阶段可以根据业务需要新增或者删除一些线程,并不是开始指定多少个线程每个阶段就必须有多少个线程。
入门体验看了概念可能不容易理解,从一个小demo入手体验下
publicclassPhaserDemo1{//指定随机种子privatestaticRandomrandom=newRandom(System.currentTimeMillis());publicstaticvoidmain(String[]args){Phaserphaser=newPhaser();//将线程注册到phaserphaser.register();for(inti=0;i<5;i){Tasktask=newTask(phaser);task.start();}phaser.arriveAndAwaitAdvance();System.out.println("alltaskexecuteclose");}staticclassTaskextendsThread{Phaserphaser;publicTask(Phaserphaser){this.phaser=phaser;this.phaser.register();}@Overridepublicvoidrun(){try{System.out.println(Thread.currentThread().getName() "开始执行");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName() "执行完毕");//类似CountDownLatch中的awaitphaser.arriveAndAwaitAdvance();}catch(InterruptedExceptione){e.printStackTrace();}}}}
不知道有没有这样的疑惑,phaser.register是向phaser去注册这个线程,那么为什么主线程也需要注册呢?
其实很简单主线程需要等待所有子线程执行完毕才能继续往下面执行所以必须要phaser.arriveAndAwaitAdvance();阻塞等待,而这个语句是意思当前线程已经到达屏障,在此等待一段时间等条件满足后需要向下一个屏障继续执行,如果没有主线程的phaser.register,直接调用phaser.arriveAndAwaitAdvance,在源码中提到可能会有异常,所以必须在主程序中注册phaser.register();
/*<p>Itisausageerrorforanunregisteredpartytoinvokethis*method.However,thiserrormayresultinan{@code*IllegalStateException}onlyuponsomesubsequentoperationon*thisphaser,ifever.*/译:未注册方调用此函数是一个使用错误方法。但是,这个错误可能会导致{@codeIllegalStateException}仅在一些后续操作这个相位器,如果有的话。
从体验的示例中其实没看出其优势在哪里,上诉场景完全可以采用CountDownLatch,所以现在换一种场景来说明Phaser的优势。
假设某校举行期末考试,有三门考试语文、数学、英语,每门课允许学生提前交卷,只有当所有学生完成考试后才能举行下一次的考试,这就是典型的分阶段任务处理,示例图如下。
将上诉场景语义化如下
publicclassPhaserExam{publicstaticRandomrandom=newRandom(System.currentTimeMillis());publicstaticvoidmain(String[]args){//一次初始化2个相当于两次registerPhaserphaser=newPhaser(2);for(inti=0;i<2;i){Examexam=newExam(phaser,random.nextLong());exam.start();}}staticclassExamextendsThread{Phaserphaser;Longid;publicExam(Phaserphaser,Longid){this.phaser=phaser;this.id=id;}@Overridepublicvoidrun(){try{System.out.println(Thread.currentThread().getName() "===开始语文考试");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName() "===结束语文考试");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() "===开始数学考试");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName() "===结束数学考试");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() "===开始英语考试");TimeUnit.SECONDS.sleep(random.nextInt(5));System.out.println(Thread.currentThread().getName() "===结束英语考试");phaser.arriveAndAwaitAdvance();}catch(InterruptedExceptione){e.printStackTrace();}}}}
代码执行结果如下,可以看到三个阶段都是等待所有线程执行完毕后才往下执行,相当于多个栅栏。
到这里请注意,通过Phaser类的构造方法构建的party数,也就是线程数需要和循环的次数对应,不然可能影响后续阶段器的正常运行。
两个重要状态在Phaser内有2个重要状态,分别是phase和party,乍一看很难理解,他们的定义如下。
phase就是阶段,如上面提到的语文、数学、英语考试这每个考试对应一个阶段,不过phase是从0开始的,当所有任务执行完毕,准备进入下一个阶段时phase就会加一。
party对应注册到Phaser线程数,party初始值有两种形式
- 方法一就是通过Phaser的有参构造初始化party值。
- 方法二采用动态注册方法phaser.register()或phaser.bulkRegister(线程数)指定线程数,注销线程调用phaser.arriveAndDeregister()方法party值会减一。
Phaser常用API总结如下所示
//获取Phaser阶段数,默认0publicfinalintgetPhase();//向Phaser注册一个线程publicintregister();//向Phaser注册多个线程publicintbulkRegister(intparties);//获取已经注册的线程数,也就是重要状态party的值publicintgetRegisteredParties();//到达并且等待其它线程到达publicintarriveAndAwaitAdvance();//到达后注销不等待其它线程,继续往下执行publicintarriveAndDeregister();//已到达线程数publicintgetArrivedParties();//未到达线程数publicintgetUnarrivedParties();//Phaser是否结束只有当party的数量是0或者调用方法forceTermination时才会结束publicbooleanisTerminated();//结束PhaserpublicvoidforceTermination();
代码演示如下
publicclassPhaserApiTest{publicstaticvoidmain(String[]args)throwsInterruptedException{Phaserphaser=newPhaser(5);System.out.println("当前阶段" phaser.getPhase());System.out.println("注册线程数===" phaser.getRegisteredParties());//向phaser注册一个线程phaser.register();System.out.println("注册线程数===" phaser.getRegisteredParties());//向phaser注册多个线程,批量注册phaser.bulkRegister(4);System.out.println("注册线程数===" phaser.getRegisteredParties());newThread(()->{//到达且等待phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() "===执行1");}).start();newThread(()->{//到达不等待,从phaser中注销一个线程phaser.arriveAndDeregister();System.out.println(Thread.currentThread().getName() "===执行2");}).start();TimeUnit.SECONDS.sleep(3);System.out.println("已到达线程数===" phaser.getArrivedParties());System.out.println("未到达线程数===" phaser.getUnarrivedParties());System.out.println("Phaser是否结束" phaser.isTerminated());phaser.forceTermination();System.out.println("Phaser是否结束" phaser.isTerminated());}}
执行结果如下所示
arriveAndAwaitAdvance解析arriveAndAwaitAdvance是Phaser中一个重要实现阻塞的API,其实arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而来,两个方法的作用分别为
- arrive:到达屏障但不阻塞,返回值为到达的阶段号。
- awaitAdvance(int):接收一个 int 值的阶段号,在指定的屏障处阻塞。
测试代码如下
publicclassPhaserTestArrive{publicstaticRandomrandom=newRandom(System.currentTimeMillis());publicstaticvoidmain(String[]args){Phaserphaser=newPhaser(5);for(inti=0;i<5;i){newTask(i,phaser).start();}phaser.register();//主线程需要调用arrive的原因是主线程注册的第六个线程还未到达,需要手动到达,才能调用awaitAdvance阻塞屏障phaser.arrive();//因为Phaser线程数为6,所以即使5个线程已经到达,但是还差主线程的一个,目前阶段数就是0phaser.awaitAdvance(0);System.out.println("alltaskisend");}staticclassTaskextendsThread{Phaserphaser;publicTask(intnum,Phaserphaser){super("Thread--" String.valueOf(num));this.phaser=phaser;}@Overridepublicvoidrun(){try{System.out.println(Thread.currentThread().getName() "===task1isstart");TimeUnit.SECONDS.sleep(random.nextInt(3));System.out.println(Thread.currentThread().getName() "===task1isend");//到达且不等待phaser.arrive();System.out.println(Thread.currentThread().getName() "===task2isstart");TimeUnit.SECONDS.sleep(random.nextInt(3));System.out.println(Thread.currentThread().getName() "===task2isend");}catch(InterruptedExceptione){e.printStackTrace();}}}}
我们需要特别注意的就是Phaser所有API中只有awaitAdvanceInterruptibly是响应中断的,其余全部不会响应中断所以不需要对其进行异常处理,演示如下
publicstaticvoidmain(String[]args){Phaserphaser=newPhaser(3);ThreadT1=newThread(()->{try{phaser.awaitAdvanceInterruptibly(phaser.getPhase());}catch(InterruptedExceptione){System.out.println("中断异常");e.printStackTrace();}//phaser.arriveAndAwaitAdvance();});T1.start();T1.interrupt();phaser.arriveAndAwaitAdvance();}
