原创 JAVA并发包解析(二)——常用同步器分析1

发布时间:2021-06-24 20:10:05 浏览 63 来源:猿笔记 作者:??LZ

    可以让线程等待其他线程执行完毕后再进行执行,比如执行一段业务逻辑需要进行多个远程调用来聚合结果,当state为0的时候表示没有线程占有锁,所以需要几个方法来判断当前线程是否持有锁,如果当前线程没有持有锁就抛出异常,6当state为0的时候尝试获取锁,如果成功则设置当前线程为锁持有者,如果锁已经被持有则判断当前线程是否持有锁;是否有其他线程正在排队获取锁,如果有其他线程正在排队获取锁,那么这个当前线程不会获取锁而是加入自旋排队,非公平在调用lock方法的时候会尝试直接获取锁这个时候有其他线程正在阻塞中也有可能直接获取到首先加锁在判断线程是否被中断抛出异常


    #主题列表:juejin,github,smartblue,cyanosis,channing-cyan,fancy,hydrogen,condensed-night-purple,greenwillow,v-green,vue-pro,healer-readable,mk-cute,jzman,geek-black,awesome-green,qklhk-chocolate

    #投稿主题:

    theme:juejin

    highlight:

    ##前言

    最后一篇文章分析了AQS的实现原理,AQS在开始的时候留下了几个方法供子类尝试实现。根据这些方法,可以实现具有不同功能的同步器。看看并发包下各种同步器的实现细节。

    ##正文

    ###CountDownLatch

    CountDownLatch是一个允许线程等待其他线程执行的计数器。例如,执行一个业务逻辑需要多个远程调用来聚合结果。多线程可以用来加快接口的访问速度,CountDownLatch可以用来等待远程调用的结果。示例代码如下:

    javapublicstaticvoidmain(String[]args)throwsInterruptedException{CountDownLatchcount=newCountDownLatch(2);startThread(1,count);startThread(2,count);count.await();System.out.println("结束");}publicstaticvoidstartThread(intid,CountDownLatchcount){newThread(newRunnable(){@Overridepublicvoidrun(){try{Thread.sleep(1000);System.out.println("执行线程"+id);}catch(InterruptedExceptione){e.printStackTrace();}finally{count.countDown();}}}).start();}

    CountDownLatch维护了一个计数器,调用await方法,当计数器不为0的时候阻塞。根据这个思路,await应该调用的是AcquireShared方法,当tryAcquireShared判断不为0时候进入自旋。源码如下:

    javaprivatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{privatestaticfinallongserialVersionUID=4982264981922014374L;Sync(intcount){setState(count);}intgetCount(){returngetState();}protectedinttryAcquireShared(intacquires){return(getState()==0)1:-1;}protectedbooleantryReleaseShared(intreleases){for(;;){intc=getState();if(c==0)returnfalse;intnextc=c-1;if(compareAndSetState(c,nextc))returnnextc==0;}}}

    使用构造函数传入一个初始值,tryReleaseShared使用原子方法给计数器减1,当为0的时候返回true,意味着AQS的releaseShared方法可以唤醒阻塞线程。

    javapublicvoidcountDown(){sync.releaseShared(1);}publicvoidawait()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}

    *++可以看出CountDownLatch的实现并不复杂。只要wait被调用并且状态不是0,线程就会阻塞,直到其他线程将计数器减少到0++*

    ###ReentrantLock

    ReentrantLock是一个独占的可重入锁,基本上功能和synchronized差不多,但是ReentrantLock是可重入且支持公平和非公平模式的。

    java//当前线程持否持有锁rotectedfinalbooleanisHeldExclusively(){returngetExclusiveOwnerThread()==Thread.currentThread();}finalbooleanisLocked(){returngetState()!=0;}finalThreadgetOwner(){returngetState()==0null:getExclusiveOwnerThread();}由于ReentrantLock是独占锁,所以只有一个线程可以持有该锁,状态表示再入次数。当状态为0时,意味着没有线程持有锁,所以需要几种方法来判断当前线程是否持有锁。

    javaprotectedfinalbooleantryRelease(intreleases){intc=getState()-releases;if(Thread.currentThread()!=getExclusiveOwnerThread())thrownewIllegalMonitorStateException();booleanfree=false;if(c==0){free=true;setExclusiveOwnerThread(null);}setState(c);returnfree;}释放锁的方法,如果当前线程没有持有锁,则抛出异常。逻辑是状态减1。如果减为0,表示锁被释放。此时,AQS将触发通知来唤醒下一个节点。

    * *可重入锁定支持公平模式和飞行公平模式,让我们看看这两种模式之间的区别* *

    -不公平锁定

    javafinalbooleannonfairTryAcquire(intacquires){finalThreadcurrent=Thread.currentThread();intc=getState();if(c==0){if(compareAndSetState(0,acquires)){setExclusiveOwnerThread(current);returntrue;}}elseif(current==getExclusiveOwnerThread()){intnextc=c+acquires;if(nextc<0)//overflowthrownewError("Maximumlockcountexceeded");setState(nextc);returntrue;}returnfalse;}状态为0时尝试获取锁。如果成功,将当前线程设置为锁持有者。如果锁已经持有,判断当前线程是否持有锁。如果是,请重新输入次数+1

    javafinalvoidlock(){if(compareAndSetState(0,1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}先尝试获取不公平锁的锁,如果没有获取锁,则进入获取方式。

    -漂亮的锁

    javaprotectedfinalbooleantryAcquire(intacquires){finalThreadcurrent=Thread.currentThread();intc=getState();if(c==0){if(!hasQueuedPredecessors()&&compareAndSetState(0,acquires)){setExclusiveOwnerThread(current);returntrue;}}elseif(current==getExclusiveOwnerThread()){intnextc=c+acquires;if(nextc<0)thrownewError("Maximumlockcountexceeded");setState(nextc);returntrue;}returnfalse;}公平锁的主要区别在于hasQueuedPredecessors,是否有其他线程排队获取锁,如果有其他线程排队获取锁,那么当前线程会加入自旋队列而不是获取锁。

    javafinalvoidlock(){acquire(1);}

    公平锁直接进入AQS的acquire方法,执行tryAcquire逻辑判断是否有其他线程正在等待

    * *公平锁和不公平锁的区别在于,当不公平调用lock方法时,会尝试直接获取锁。此时,其他线程正在阻塞,可能会直接获得锁,但是公平锁有一个顺序。**

    *++ReentrantLock也支持条件变量,使用newCondition来新建一个条件变量,当获取锁以后可以使用wait来阻塞当前线程,之前的文章提到过Condition使用方法++*

    ###CyclicBarrier

    CyclicBarrier内部的一组线程互相等待执行完毕后再执行下一步,和CountDownLatch有些相似;CountDownLatch只能使用一次,而CyclicBarrier能使用多次,其中实现原理也不相同。示例代码如下:

    javaprivatestaticCyclicBarriercyclicBarrier;publicstaticvoidmain(String[]args)throwsInterruptedException{cyclicBarrier=newCyclicBarrier(2,newRunnable(){@Overridepublicvoidrun(){System.out.println("执行");}});startThread(1);startThread(2);}publicstaticvoidstartThread(intid){newThread(newRunnable(){@Overridepublicvoidrun(){try{Thread.sleep(1000);System.out.println("执行线程"+id);cyclicBarrier.await();\t\tSystem.out.println("线程到达"+id);}catch(BrokenBarrierException|InterruptedExceptione){e.printStackTrace();}}}).start();}

    源代码如下:

    javaprivatefinalReentrantLocklock=newReentrantLock();/**Conditiontowaitonuntiltripped*/privatefinalConditiontrip=lock.newCondition();/**Thenumberofparties*/privatefinalintparties;/*Thecommandtorunwhentripped*/privatefinalRunnablebarrierCommand;/**Thecurrentgeneration*/privateGenerationgeneration=newGeneration();/***Numberofpartiesstillwaiting.Countsdownfrompartiesto0*oneachgeneration.Itisresettopartiesoneachnew*generationorwhenbroken.*/privateintcount;CyclicBarrier使用可重入锁定和条件的组合。看看等待的方法

    javapublicintawait()throwsInterruptedException,BrokenBarrierException{try{returndowait(false,0L);}catch(TimeoutExceptiontoe){thrownewError(toe);//cannothappen}}

    调用了dowait不带超时时间

    javalock.lock();try{finalGenerationg=generation;if(g.broken)thrownewBrokenBarrierException();if(Thread.interrupted()){breakBarrier();thrownewInterruptedException();}intindex=--count;if(index==0){//trippedbooleanranAction=false;try{finalRunnablecommand=barrierCommand;if(command!=null)command.run();ranAction=true;nextGeneration();return0;}finally{if(!ranAction)breakBarrier();}}privatevoidbreakBarrier(){generation.broken=true;count=parties;trip.signalAll();}DoWait前半段代码被锁定,判断线程是否中断,抛出异常;否则,计数递减。当count为0时,所有线程都准备好了。如果我们提前通过Runnable,我们将执行这个逻辑并调用下一代。

    javaprivatevoidnextGeneration(){trip.signalAll();count=parties;generation=newGeneration();}Next Generation的功能是唤醒被阻塞的线程,重置计数器,以备下次使用。如果index不为0,则意味着还有其他线程没有完成执行,因此转到下一个代码

    javafor(;;){try{if(!timed)trip.await();elseif(nanos>0L)nanos=trip.awaitNanos(nanos);}catch(InterruptedExceptionie){if(g==generation&&!g.broken){breakBarrier();throwie;}else{Thread.currentThread().interrupt();}}if(g.broken)thrownewBrokenBarrierException();if(g!=generation)returnindex;if(timed&&nanos<=0L){breakBarrier();thrownewTimeoutException();}}}finally{lock.unlock();}其实内核是用条件变量来阻塞这个线程的,另一个是处理异常情况,因为阻塞过程中其他线程可能会重置CyclicBarrier,所以需要抛出异常。

    *++CyclicBarrier核心就是使用count来进行计数,ReentrantLock和Condition的配合使用,每次调用都将count减少,当某个调用减少为0的时候就唤醒所有被阻塞的线程++*

    ##总结

    这篇文章讲了CountDownLatch、ReentrantLock和CyclicBarrier,在AQS的的基础上进行扩展,实现起来非常简单。

作者信息

??LZ [等级:3] java开发工程师
发布了 22 篇专栏 · 获得点赞 18 · 获得阅读 2147

相关推荐 更多