0
点赞
收藏
分享

微信扫一扫

JUC中等待多线程完成的工具类CountDownLatch,必备技能

三次方 2021-09-28 阅读 103
日记本

本篇内容

1.介绍CountDownLatch及使用场景
2.提供几个实例介绍CountDownLatch的使用
3.手写一个并行处理的工具类

CountDownLatch介绍

CountDownLatch称之为闭锁,它可以使一个或一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。
常用方法:

  • public CountDownLatch(int count):构造方法,count表示计数器的值,不能小于0,否者会报异常。

  • public void await() throws InterruptedException:调用await()会让当前线程等待,直到计数器为0的时候,方法才会返回,此方法会响应线程中断操作。

  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException:限时等待,在超时之前,计数器变为了0,方法返回true,否者直到超时,返回false,此方法会响应线程中断操作。

  • public void countDown():让计数器减1

CountDownLatch使用步骤:

  • 创建CountDownLatch对象
  • 调用其实例方法 await(),让当前线程等待
  • 调用 countDown()方法,让计数器减1
  • 当计数器变为0的时候, await()方法会返回

举例说明

我们使用CountDownLatch来完成上面示例中使用join实现的功能,代码如下:

package com.shiguiwu.springmybatis.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @description: CountDownLatch 闭锁
 * @author: stone
 * @date: Created by 2021/6/21 17:04
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis.thread
 */
public class CountDownLatchTests {

    //教练>>>>>>>>>>>>>>>>
    private static CountDownLatch commanderCd  = new CountDownLatch(1);

    //三个运动员
    private static CountDownLatch sportsman = new CountDownLatch(3);


    public static void main(String[] args) throws Exception{
        testCountDownLatch();


    }

    public static void testCountDownLatch() throws Exception {
        new Thread(() -> {
            try {
                commanderCd.await();
                System.out.println(Thread.currentThread().getName() + "=========>开始跑了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                sportsman.countDown();
                System.out.println(Thread.currentThread().getName() + "=========>达到终点");

            }


        },"小明").start();

        new Thread(() -> {
            try {
                commanderCd.await();
                System.out.println(Thread.currentThread().getName() + "=========>开始跑了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                sportsman.countDown();
                System.out.println(Thread.currentThread().getName() + "=========>达到终点");

            }


        },"小红").start();

        new Thread(() -> {
            try {
                commanderCd.await();
                System.out.println(Thread.currentThread().getName() + "=========>开始跑了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                sportsman.countDown();
                System.out.println(Thread.currentThread().getName() + "=========>达到终点");

            }


        }, "小武").start();
        TimeUnit.SECONDS.sleep(5);
        System.out.println("预备===============>");
        commanderCd.countDown();
        sportsman.await();
        System.out.println("所以的运动员都跑完===============================>");

    }
}

代码中,t1、t2、t3启动之后,都阻塞在 commanderCd.await();,主线程模拟发枪准备操作耗时5秒,然后调用 commanderCd.countDown();模拟发枪操作,此方法被调用以后,阻塞在 commanderCd.await();的3个线程会向下执行。主线程调用 countDownLatch.await();之后进行等待,每个人跑完之后,调用 countDown.countDown();通知一下 countDownLatch让计数器减1,最后3个人都跑完了,主线程从 countDownLatch.await();返回继续向下执行。

手写并发工具

package com.shiguiwu.springmybatis.thread;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @description: 手写并发工具
 * @author: stone
 * @date: Created by 2021/6/24 9:42
 * @version: 1.0.0
 * @pakeage: com.shiguiwu.springmybatis.thread
 */
public class TaskDisposeUtils {

    //并发数
    public static final int POOL_SIZE;
    static {
        POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
    }

    /**
     * 并发处理
     * @param taskList 任务列表
     * @param consumer 消费者
     * @param <T> 类型
     * @throws InterruptedException 异常
     */
    public static <T> void dispose( List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        dispose(true, POOL_SIZE, taskList, consumer);
    }

    /**
     * 并发处理
     * @param isMoreThread 是否并发
     * @param poolSize 线程数
     * @param taskList 任务列表
     * @param consumer 消费者
     * @param <T> 类型
     * @throws InterruptedException 异常
     */
    public static <T> void dispose(boolean isMoreThread, int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        if (isMoreThread && poolSize > 1) {
            poolSize = Math.min(poolSize, taskList.size());
            ExecutorService service = null;
            try {
                service = Executors.newFixedThreadPool(poolSize);

                ExecutorService finalService = service;
                CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
                taskList.forEach(t -> finalService.execute(() -> {
                    try {
                        consumer.accept(t);
                    } finally {
                        countDownLatch.countDown();
                    }

                }));
                countDownLatch.await();
            }  finally {
                if (service != null) {
                    service.shutdown();
                }
            }
        }
        else {
            taskList.forEach(consumer::accept);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
        dispose(list, l-> {
            try {
                TimeUnit.SECONDS.sleep(l);
                System.out.println("结束=======================>");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

}

TaskDisposeUtils是一个并行处理的工具类,可以传入n个任务内部使用线程池进行处理,等待所有任务都处理完成之后,方法才会返回。比如我们发送短信,系统中有1万条短信,我们使用上面的工具,每次取100条并行发送,待100个都处理完毕之后,再取一批按照同样的逻辑发送。

举报

相关推荐

0 条评论