本篇内容
1.介绍CountDownLatch及使用场景
2.提供几个实例介绍CountDownLatch的使用
3.手写一个并行处理的工具类
CountDownLatch介绍
CountDownLatch称之为闭锁,它可以使一个或一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。
常用方法:
public CountDownLatch(int count):构造方法,count表示计数器的值,不能小于0,否者会报异常。
public void await() throws InterruptedException:调用await()会让当前线程等待,直到计数器为0的时候,方法才会返回,此方法会响应线程中断操作。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException:限时等待,在超时之前,计数器变为了0,方法返回true,否者直到超时,返回false,此方法会响应线程中断操作。
public void countDown():让计数器减1
CountDownLatch使用步骤:
- 创建CountDownLatch对象
- 调用其实例方法 await(),让当前线程等待
- 调用 countDown()方法,让计数器减1
- 当计数器变为0的时候, await()方法会返回
举例说明
我们使用CountDownLatch来完成上面示例中使用join实现的功能,代码如下:
package com.shiguiwu.springmybatis.thread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @description: CountDownLatch 闭锁
* @author: stone
* @date: Created by 2021/6/21 17:04
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.thread
*/
public class CountDownLatchTests {
//教练>>>>>>>>>>>>>>>>
private static CountDownLatch commanderCd = new CountDownLatch(1);
//三个运动员
private static CountDownLatch sportsman = new CountDownLatch(3);
public static void main(String[] args) throws Exception{
testCountDownLatch();
}
public static void testCountDownLatch() throws Exception {
new Thread(() -> {
try {
commanderCd.await();
System.out.println(Thread.currentThread().getName() + "=========>开始跑了");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
sportsman.countDown();
System.out.println(Thread.currentThread().getName() + "=========>达到终点");
}
},"小明").start();
new Thread(() -> {
try {
commanderCd.await();
System.out.println(Thread.currentThread().getName() + "=========>开始跑了");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
sportsman.countDown();
System.out.println(Thread.currentThread().getName() + "=========>达到终点");
}
},"小红").start();
new Thread(() -> {
try {
commanderCd.await();
System.out.println(Thread.currentThread().getName() + "=========>开始跑了");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sportsman.countDown();
System.out.println(Thread.currentThread().getName() + "=========>达到终点");
}
}, "小武").start();
TimeUnit.SECONDS.sleep(5);
System.out.println("预备===============>");
commanderCd.countDown();
sportsman.await();
System.out.println("所以的运动员都跑完===============================>");
}
}
代码中,t1、t2、t3启动之后,都阻塞在 commanderCd.await();,主线程模拟发枪准备操作耗时5秒,然后调用 commanderCd.countDown();模拟发枪操作,此方法被调用以后,阻塞在 commanderCd.await();的3个线程会向下执行。主线程调用 countDownLatch.await();之后进行等待,每个人跑完之后,调用 countDown.countDown();通知一下 countDownLatch让计数器减1,最后3个人都跑完了,主线程从 countDownLatch.await();返回继续向下执行。
手写并发工具
package com.shiguiwu.springmybatis.thread;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @description: 手写并发工具
* @author: stone
* @date: Created by 2021/6/24 9:42
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.thread
*/
public class TaskDisposeUtils {
//并发数
public static final int POOL_SIZE;
static {
POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
}
/**
* 并发处理
* @param taskList 任务列表
* @param consumer 消费者
* @param <T> 类型
* @throws InterruptedException 异常
*/
public static <T> void dispose( List<T> taskList, Consumer<T> consumer) throws InterruptedException {
dispose(true, POOL_SIZE, taskList, consumer);
}
/**
* 并发处理
* @param isMoreThread 是否并发
* @param poolSize 线程数
* @param taskList 任务列表
* @param consumer 消费者
* @param <T> 类型
* @throws InterruptedException 异常
*/
public static <T> void dispose(boolean isMoreThread, int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
if (isMoreThread && poolSize > 1) {
poolSize = Math.min(poolSize, taskList.size());
ExecutorService service = null;
try {
service = Executors.newFixedThreadPool(poolSize);
ExecutorService finalService = service;
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
taskList.forEach(t -> finalService.execute(() -> {
try {
consumer.accept(t);
} finally {
countDownLatch.countDown();
}
}));
countDownLatch.await();
} finally {
if (service != null) {
service.shutdown();
}
}
}
else {
taskList.forEach(consumer::accept);
}
}
public static void main(String[] args) throws InterruptedException {
List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
dispose(list, l-> {
try {
TimeUnit.SECONDS.sleep(l);
System.out.println("结束=======================>");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
TaskDisposeUtils是一个并行处理的工具类,可以传入n个任务内部使用线程池进行处理,等待所有任务都处理完成之后,方法才会返回。比如我们发送短信,系统中有1万条短信,我们使用上面的工具,每次取100条并行发送,待100个都处理完毕之后,再取一批按照同样的逻辑发送。