多线程并发之CountDownLatch(闭锁)使用详解

您所在的位置:网站首页 c语言中线程已退出什么意思 多线程并发之CountDownLatch(闭锁)使用详解

多线程并发之CountDownLatch(闭锁)使用详解

2024-07-06 15:39| 来源: 网络整理| 查看: 265

专题相关文章:

从内存可见性看Volatile、原子变量和CAS算法 多线程并发之CountDownLatch(闭锁)使用详解 多线程并发之显示锁Lock与其通信方式Condition源码解读 多线程并发之读写锁(ReentranReadWriteLock&ReadWriteLock)使用详解 多线程并发之线程池Executor与Fork/Join框架 多线程并发之JUC 中的 Atomic 原子类总结 多线程并发之volatile的底层实现原理 多线程并发之Semaphore(信号量)使用详解

【1】CountDownLatch是什么

CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:

确保某个计算在其需要的所有资源都被初始化之后才继续执行;确保某个服务在其依赖的所有其他服务都已经启动之后才启动;等待直到某个操作所有参与者都准备就绪再继续执行。

CountDownLatch有一个正数计数器,countDown()方法对计数器做减操作,await()方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。

闭锁(倒计时锁)主要用来保证完成某个任务的先决条件满足。是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。

CountDownLatch同样依赖队列同步器AbstractQueuedSynchronizer,其类方法如下: **加粗样式** 其内部类Sync同样继承了AQS并重写了tryAcquireShared和tryReleaseShared方法。同时也可以表明CountDownLatch是基于共享锁模式的。

CountDownLatch 的两种典型用法

①某一线程在开始运行前等待n个线程执行完毕。

将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

②实现多个线程开始执行任务的最大并行性。

注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

如下示例,在多线程运行情况下,计算多线程耗费时间:

public class TestCountDownLatch { public static void main(String[] args){ LatchDemo latchDemo = new LatchDemo(); long begin = System.currentTimeMillis(); //多线程 for (int i = 0; i 0,则返回false。如果等待时间时间小于或等于零,方法将不会等待。 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 将count-1 ,如果count减一后为0 ,则释放所有等待的线程 */ public void countDown() { sync.releaseShared(1); } /** 返回当前的count,该方法通常用在debug或者测试中 */ public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }

简单总结如下:

内部类Sync同样继承AQS;AQS的state代表count;初始化使用计数器count;count代表多个线程执行或者某个操作执行次数;countDown()方法将会将count-1;count为0将会释放所有等待线程;await方法将会阻塞直到count为0;CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。count不为0,但是等待时间过去将会返回false。开关锁应用;问题分解应用–并行性;

③ CountDownLatch Javadoc

一个同步助手使一个或多个线程在其他线程完成一系列操作前一直处于等待状态。CountDownLatch使用一个count初始化。await方法将会使线程阻塞直到由于countDown方法导致count计数达到0。如果count减为0,所有等到的线程将会被释放并立即返回到后续调用。这是一次性的现象,不能重置计数。如果需要重新设置count,可以使用CyclicBarrier–栅栏。参考博文:多线程并发之CyclicBarrier(栅栏)使用详解

CountDownLatch是一个多能力的同步工具可以被使用到许多目的。

使用 one 作为count值初始化的CountDownLatch可以被用作开关锁或着门:所有线程在这个门前等待(await方法)直到某个线程调用countDown将门打开。使用N作为count值初始化的CountDownLatch可以被用于使某个线程等待N个线程完成某个动作或者某个动作被完成N次。

CountDownLatch的一个有用特性是,它不要求调用countDown的线程在继续之前等待计数达到零,它只是防止任何线程通过直到所有线程都可以通过。

④ 官方实例一

这里有两个类,其中一组worker使用两个倒计时锁存器:第一个锁是一个开始信号–阻止任何worker直到driver准备好;第二个锁是一个完成信号–允许driver等待直到所有的worker都完成。

class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }}

⑤ 官方实例二 另一个典型的用法是将问题分成N个部分,用Runnable描述每个部分,Runnable执行该部分并在锁存器上倒计时,并将所有Runnable排队给Executor。当所有子部件完成时,协调线程将能够通过等待(如果需要使用这种形式,建议使用CyclicBarrier)。

class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = Executors.newScheduledThreadPool(N) for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }} 【3】CountDownLatch中Sync使用AQS的相关方法

① await()方法中的acquireSharedInterruptibly(1)

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

AQS中acquireSharedInterruptibly(1)源码如下:

/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果小于0,就执行获取操作 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

② await(long timeout, TimeUnit unit)中的tryAcquireSharedNanos(1, unit.toNanos(timeout))

public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }

AQS中tryAcquireSharedNanos(int arg, long nanosTimeout)源码如下:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //很有意思,这里用了 || return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }

③ countDown()中的releaseShared(int arg)

public void countDown() { sync.releaseShared(1); }

AQS中releaseShared(int arg)源码如下:

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3