0
点赞
收藏
分享

微信扫一扫

【JUC系列】同步工具类之CyclicBarrier

水沐由之 2022-05-03 阅读 43
java

同步屏障 CyclicBarrier

文章目录


"循环屏障"是一种同步辅助工具,它允许一组线程相互等待以达到共同的屏障点。 CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用。

CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。
在这里插入图片描述

示例

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 分布计算再合并
 */
public class CyclicBarrierDemo2 implements Runnable {

    /**
     * 当最后一个线程到达屏障点,执行此方法,汇总各个线程的值
     */
    @Override
    public void run() {
        int result = 0;
        // 遍历
        for (Map.Entry<String, Integer> ss : s.entrySet()) {
            result += ss.getValue();
        }
        s.put("result", result);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] count value [" + result + "].");
    }

    /**
     * 定义一个需要4个线程的同步屏障,并在最后一个到达屏障的线程后,执行当前类的run方法
     */
    private final CyclicBarrier c = new CyclicBarrier(4, this);

    /**
     * 4个线程的线程池
     */
    private final Executor executor = Executors.newFixedThreadPool(4);

    /**
     * 用来存放各个线程运算结果的值
     */
    private final ConcurrentHashMap<String, Integer> s = new ConcurrentHashMap<>();

    /**
     * 计算api 4个线程分别向变量s中塞值
     */
    private void count() {
        for (int i = 0; i < 4; i++) {
            int finalI = i;
            executor.execute(() -> {
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] put value [" + finalI + "] and await.");
                s.put(Thread.currentThread().getName(), finalI);
                try {
                    c.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo2 cyclicBarrierDemo2 = new CyclicBarrierDemo2();
        cyclicBarrierDemo2.count();
    }
}

执行结果

[16:51:15--pool-1-thread-1] put value [0] and await.
[16:51:15--pool-1-thread-3] put value [2] and await.
[16:51:15--pool-1-thread-4] put value [3] and await.
[16:51:15--pool-1-thread-2] put value [1] and await.
[16:51:15--pool-1-thread-2] count value [6].

核心思想

内部组合了非公平策略的重入锁,借助AQS实现线程的阻塞和唤醒,主要依赖条件队列。关于AQS的Condition。

组成

构造函数

    // 创建一个新的 CyclicBarrier,它将在给定数量的参与方(线程)等待它时触发,并且在触发障碍时不执行预定义的操作。 parties代表线程数
	public CyclicBarrier(int parties) {
        this(parties, null);
    }

	// 创建一个新的 CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将触发,并且当障碍被触发时,它将执行给定的屏障动作,由最后一个进入屏障的线程执行。barrierAction代表当屏障被触发时执行的命令,如果没有动作则为 null
    public CyclicBarrier(int parties, Runnable barrierAction) {
        // 参与者必须大于0
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

内部类 Generation

屏障的每次使用都表示为一个生成实例。每当栅栏被触发或重置时,实例就会发生变化。可以有许多代(轮次)与使用屏障的线程相关联

  • 由于锁定可能分配给等待线程的非确定性方式
  • 但一次只能激活其中一个(应用计数的那个)并且所有其余的要么坏要么绊倒。

如果有中断但没有后续重置,则不需要活动生成实例。

    private static class Generation {
        boolean broken = false;
    }

成员变量

// 用于保护障碍入口的锁
private final ReentrantLock lock = new ReentrantLock();
// 条件对象
private final Condition trip = lock.newCondition();
// 参与线程的数量
private final int parties;
// 由最后一个进入 barrier 的线程执行的操作
private final Runnable barrierCommand;
// 正在等待进入屏障的线程数量
private int count;

核心方法

方法名描述
int getParties()返回触发此障碍所需的参与方数量。
int await() throws InterruptedException, BrokenBarrierException等待所有线程都在障碍点上调用了await。
如果当前线程不是最后到达的,则出于线程调度目的将其禁用并处于休眠状态,直到发生以下情况之一:
1.最后一个线程到达
2.其他线程中断当前线程
3.其他一些线程中断了其他等待线程之一
4.其他一些线程在等待屏障时超时
5.其他一些线程在此屏障上调用重置
如果当前线程:
1.在进入此方法时设置其中断状态
2.等待时被打断
会抛出 InterruptedException 并清除当前线程的中断状态。

如果在任何线程等待时屏障被重置,或者在调用 await 时或者在任何线程正在等待时屏障被破坏,则抛出 BrokenBarrierException。

如果任何线程在等待时被中断,那么所有其他等待的线程都会抛出 BrokenBarrierException 并且屏障处于损坏状态
如果当前线程是最后到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。 如果在屏障操作期间发生异常,则该异常将在当前线程中传播,并且屏障处于损坏状态。
int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException在 await() 的基础上增加了等待超时时长,如果指定的等待时间过去,则抛出 TimeoutException。 如果时间小于或等于零,则该方法根本不会等待。
boolean isBroken()查询此屏障是否处于损坏状态。
void reset()将屏障重置为其初始状态。 如果任何一方当前在屏障处等待,他们将返回一个 BrokenBarrierException。 请注意,由于其他原因发生破损后的重置可能会很复杂; 线程需要以其他方式重新同步,并选择一个执行重置。 相反,最好为后续使用创建一个新的屏障。
int getNumberWaiting()返回当前在屏障处等待的参与方数量。 此方法主要用于调试和断言。

dowait(boolean timed, long nanos)

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 保存当前锁
        final ReentrantLock lock = this.lock;
        // 争夺锁
        lock.lock();
        try {
            // 保存代
            final Generation g = generation;

            // 检查屏障是否损坏
            if (g.broken)
                throw new BrokenBarrierException();

            // 检查线程是否中断
            if (Thread.interrupted()) {
                // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
                breakBarrier();
                throw new InterruptedException();
            }

            // 减少等待进入屏障的线程数
            int index = --count;
            // index为0,代表定义的所需线程已经都到达屏障
            if (index == 0) {  // tripped
                // 执行动作的标识
                boolean ranAction = false;
                try {
                    // 是否有屏障命令
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中
                    nextGeneration();
                    return 0;
                } finally {
                    // 若屏障命令执行失败 需要损坏当前屏障
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 无限循环-直到需要出现以下情况之一:
            // 1.线程都到达屏障
            // 2.屏障损坏
            // 3.中断
            // 4.等待超时
            for (;;) {
                try {
                    // 没有设置等待时间-一直等
                    if (!timed)
                        trip.await();
                    // 设置等待时间且没超时,调用超时的等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 若当前线程发生中断,需要损坏当代屏障
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                // 再次检查屏障是否损坏
                if (g.broken)
                    throw new BrokenBarrierException();
                // 代数不对 返回下标
                if (g != generation)
                    return index;

                // 设置等待时间,并且已经过了超时时间 损坏屏障 抛出超时异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

nextGeneration()

    private void nextGeneration() {
        // 唤醒所有线程
        trip.signalAll();
        // 重置屏障执行的所需的线程数
        count = parties;
        // 新生一代
        generation = new Generation();
    }

breakBarrier()

    private void breakBarrier() {
        // 当前代屏障损坏标识
        generation.broken = true;
        // 恢复正在等待进入屏障的线程数量
        count = parties;
        // 唤醒所有线程
        trip.signalAll();
    }
举报

相关推荐

0 条评论