一 模拟业务场景
某天,可爱的产品经理对你提出一个需求,每个月末导出公司几个大部门下的每个员工的加班时长统计数据;假设大部门只有几个,但是每个大部门下的员工数量成千上万,但是可爱的产品经理不想每次导出数据都得等好久,你会如何设计开发?
二 CountDownLatch 介绍
CountDownLatch是java1.5之后被引入的,是java.util.concurrent包下的一个同步工具类,它允许一个或多个线程等待,直到其他线程中一组操作执行完成。
CountDownLatch主要有countDown方法和await方法。初始化CountDownLatch对象时,需要指定一个整数作为计数器的初始值,这个值一般是需要等待完成某些操作的线程的数量值。当调用countDown方法时,计数器的值就减1;而调用await方法线程,如果此时计数器的值大于0,则此线程会被阻塞,直到计数器的值被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器的值被减到0时,调用await方法都会直接返回。
调用countDown方法的线程可以继续执行,不需要等待计数器被减到0,只有调用await方法的线程才需要等待。
三 CountDownLatch 源码分析
CountDownLatch底层是基于AbstractQueuedSynchronizer实现的,它持有一个内部类对象Sync sync,这个内部类继承抽象类AbstractQueuedSynchronizer,CountDownLatch只有一个构造函数CountDownLatch(int count),这个count值最终直接赋给AQS的state变量,并且保证了修改state的可见性和原子性。
public class CountDownLatch {
/**
* CountDownLatch的内部类Sync
* 其中维护了一个整数state,并且保证了修改state的可见性和原子性
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
// 持有内部类对象
private final Sync sync;
// 唯一构造函数,创建CountDownLatch实例时,也会创建一个Sync的实例,
// 同时把计数器的值传给Sync实例,最终赋值给AOS的state变量
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// await方法中调用了Sync实例的acquireSharedInterruptibly方法
// 调用此方法的线程会被挂起,直到count值为0才继续执行
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有等待时间的await方法
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 在countDown方法中,只调用了Sync实例的releaseShared方法,
// 将计数器的值减1
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
}
复制代码
/**
* The synchronization state.
* 这是 AbstractQueuedSynchronizer 类中的一个整数变量 state,
* 并且保证了修改state的可见性和原子性。
*/
private volatile int state;
CountDownLatch类就是这么简单,只有几个方法。每次调用countDown()方法时,都会调用AQS的的releaseShared方法,它先对计数器的值进行减1操作,如果减1后的计数器为0,则唤醒被await方法阻塞的所有线程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //对计数器进行减一操作
doReleaseShared(); //如果计数器为0,唤醒被await方法阻塞的所有线程
return true;
}
return false;
}
而对计数器的值减1操作则利用了CAS操作,直到获取到锁就进行减1操作,如下:
protected boolean tryReleaseShared(int releases) {
for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
int c = getState();//获取当前计数器的值。
if (c == 0)// 计数器为0时,就直接返回。
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
return nextc == 0;//如果操作成功,返回计数器是否为0
}
}
而调用await方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
// 判断计数器是否为 0,如果不为 0 则阻塞当前线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// Sync重写了AbstractQueuedSynchronizer中的模板方法tryAcquireShared,
// 其主要是判断计数器的值是否为零,如果为零则返回1,如果不为零则返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
四 模拟业务解决
既然我们已经明白CountDownLatch的原理了,那我们就可以解决文章开头提出的业务需求了。即每个大部门开一个线程去统计员工的加班时长,等所有线程统计完成后,再把所有统计数据填充到excel导出。
下面我们假设只有2个大部门,实际有多个部门则count的值就为多少,不过实际要开多少线程还是得根据自己实际情况评估,切记不可盲目开大量的线程。线程的使用也可以通过线程池进行控制,此为演示方便不使用线程池。
package com.nobody;
import java.util.concurrent.CountDownLatch;
/**
* @Description CountDownLatch演示
* @Author Mr.nobody
* @Date 2021/3/21
* @Version 1.0
*/
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 初始化CountDownLatch实例
CountDownLatch countDownLatch = new CountDownLatch(2);
// 模拟统计A部门人员加班时长
new Thread(() -> {
System.out.println("开始统计A部门人员加班时长情况...");
try {
// 模拟处理数据
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束统计A部门人员加班时长情况...");
countDownLatch.countDown();
}).start();
// 模拟统计B部门人员加班时长
new Thread(() -> {
System.out.println("开始统计B部门人员加班时长情况...");
try {
// // 模拟处理数据
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束统计B部门人员加班时长情况...");
countDownLatch.countDown();
}).start();
Thread.sleep(100);
System.out.println("等待所有部门统计...");
countDownLatch.await();
System.out.println("所有部门统计结束,导出数据...");
}
}
启动程序,输出结果如下:
开始统计B部门人员加班时长情况...
开始统计A部门人员加班时长情况...
等待所有部门统计...
结束统计B部门人员加班时长情况...
结束统计A部门人员加班时长情况...
所有部门统计结束,导出数据...
复制代码
如果某些线程因为数据量大,或者服务调用链时间长,统计很久还没出结果怎么办?难道要一直等待?那可以将await()方法换为await(long timeout, TimeUnit unit)方法,即超过设定的时间就不再阻塞等待。
System.out.println("等待所有部门统计...");
countDownLatch.await(3000, TimeUnit.MILLISECONDS);
System.out.println("所有部门统计结束,导出数据...");
再启动程序,输出结果如下,因为A部门统计线程执行时间超过了等待时间,所以没有等待A部门统计线程执行完,就继续执行主线程了。不过对于超时未完成的线程,自己要根据情况是否要做特殊处理了。
开始统计A部门人员加班时长情况...
开始统计B部门人员加班时长情况...
等待所有部门统计...
结束统计B部门人员加班时长情况...
所有部门统计结束,导出数据...
结束统计A部门人员加班时长情况...
以上演示的只是单个线程等待多个线程的情况,其实多个线程等待单个线程的情况也是常见的,例如运动会上,多个运动员(多个线程)准备就绪待跑,只有裁判(单个线程)鸣枪,运动员才开始跑。
其实juc包下有另外一个工具类CyclicBarrier,也是有线程等待的作用,不过它是实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作;它的计数器是递增的,还有重置功能,可以多次使用。