0
点赞
收藏
分享

微信扫一扫

Java8中JUC包同步工具类深度解析(Semaphore,CountDownLatch,CyclicBarrier,Phaser)

个人主页: 进朱者赤

阿里非典型程序员一枚 ,记录平平无奇程序员在大厂的打怪升级之路。 一起学习Java、大数据、数据结构算法(公众号同名

Java8中JUC包同步工具类深度解析(Semaphore,CountDownLatch,CyclicBarrier,Phaser)_工具类

引言

在Java中,并发编程一直是一个重要的领域,而JDK 8中的java.util.concurrent(JUC)包提供了丰富的同步工具类,帮助开发者更加高效地处理并发问题。本文将分层次、分逻辑地介绍这些同步工具类的底层实现原理、使用方法和源码解析,并给出使用注意事项。

一、Semaphore(信号量)

1. 简介

Semaphore是一种同步工具,它允许一定数量的线程同时访问共享资源。通过控制信号量的许可数量,Semaphore能够实现对共享资源的并发访问限制。

2. 适用场景

Semaphore适用于需要限制并发访问共享资源数量的场景。例如,数据库连接池中的连接数控制,防止过多的请求同时访问数据库;或者在分布式系统中限制某个服务能够处理的并发请求数,以保证服务的稳定性和响应速度。

3. 使用

Semaphore semaphore = new Semaphore(5); // 初始化信号量为5
semaphore.acquire(); // 获取一个许可,若信号量为0则阻塞
// 访问共享资源
semaphore.release(); // 释放一个许可

4. 内部原理及源码解读

内部原理

Semaphore基于AQS(AbstractQueuedSynchronizer)实现,它维护了一个许可计数器。当线程调用acquire()方法时,如果许可计数器大于0,则直接返回;否则线程会被加入等待队列并阻塞。当线程调用release()方法时,许可计数器加一,并尝试唤醒等待队列中的一个线程。

源码解读

Semaphore内部有一个类Sync,它继承了AbstractQueuedSynchronizer。Sync有两个子类:FairSync和NonfairSync,分别用于处理公平和非公平策略。

// Semaphore的构造方法
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

NonfairSyncFairSync中,会重写AQS的tryAcquiretryRelease等方法,来实现对许可计数器的增减操作以及线程的同步。

5. 注意事项

  • 使用Semaphore时,要确保释放的许可数量与获取的数量相匹配,避免造成死锁或资源泄漏。
  • 在高并发场景下,要合理设置信号量的初始值,以平衡资源利用率和并发性能。

二、CountDownLatch(倒计时锁)

1. 简介

CountDownLatch是一种同步工具,它允许一个或多个线程等待其他线程完成操作。通过维护一个计数器,当计数器减至0时,等待的线程将被唤醒。

2. 适用场景

CountDownLatch适用于需要等待一组线程完成某个任务后再继续执行的场景。例如,在启动多个线程进行并行计算时,可以使用CountDownLatch来等待所有线程计算完成后,主线程再进行汇总处理。

3. 使用

CountDownLatch latch = new CountDownLatch(5); // 初始化计数器为5
// ...其他线程执行操作,每完成一个操作调用latch.countDown()
latch.await(); // 当前线程等待,直到计数器减至0

4. 内部原理及源码解读

内部原理

CountDownLatch同样基于AQS实现,它维护了一个计数器。当线程调用countDown()方法时,计数器减一;当计数器减至0时,AQS会唤醒等待队列中的所有线程。

源码解读

CountDownLatch的核心在于AQS的state变量,它代表了计数器的值。

// CountDownLatch的构造方法
public CountDownLatch(int count) {
    // 初始化计数器
    sync = new Sync(count);
}

// Sync是CountDownLatch的内部类,继承了AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 498226498192269037L;

    Sync(int count) {
        setState(count); // 设置AQS的state为初始计数器值
    }

    // ...其他方法,如tryAcquireShared等
}

tryAcquireShared方法中,会检查计数器的值是否为0,如果是则直接返回表示可以获取共享资源,否则将当前线程加入等待队列。当countDown方法被调用时,会调用releaseShared方法减少计数器的值,并尝试唤醒等待队列中的线程。

5. 注意事项

  • 在使用CountDownLatch时,要确保所有需要等待的线程都调用了countDown()方法,并且计数器的初始值设置正确。
  • 等待线程在调用await()方法后会被阻塞,直到计数器减至0,因此要避免在等待过程中执行耗时操作或阻塞操作。

三、CyclicBarrier(循环栅栏)

1. 简介

CyclicBarrier是一种同步工具,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(barrier point)。一旦所有线程都到达屏障点,它们可以继续执行后续操作。

2. 适用场景

CyclicBarrier适用于需要将一组线程分割成多个阶段,并在每个阶段完成后进行汇总或协调的场景。例如,在多个线程协同完成一个复杂任务时,每个线程负责不同的子任务,当所有线程都完成各自子任务后,再进行下一步操作。

3. 使用

CyclicBarrier cyclicBarrier = new CyclicBarrier(5); // 初始化栅栏,需要5个线程到达
// ...多个线程执行操作,到达屏障点时调用cyclicBarrier.await()
cyclicBarrier.await(); // 当前线程等待,直到所有线程都到达屏障点

4. 内部原理及源码解读

内部原理:

CyclicBarrier内部使用了锁和条件变量来实现线程间的同步。当线程到达屏障点时,首先检查是否有足够的线程到达,如果有则继续执行;否则将线程加入等待队列并阻塞。当最后一个线程到达屏障点时,唤醒所有等待的线程。

源码解读:

CyclicBarrier的核心在于其内部类Generation,它代表了屏障的某个周期。每个Generation都有一个计数器来记录到达屏障点的线程数量。

// CyclicBarrier的构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
    this.lock = new ReentrantLock();
    this.condition = lock.newCondition();
    this.generation = new Generation();
}

// Generation内部类
private static class Generation {
    boolean broken = false;
    int index = 0;
}

await方法中,线程会首先尝试获取锁,然后检查当前Generation的计数器是否为0。如果不为0,则线程会加入等待队列并阻塞。当最后一个线程到达屏障点时,它会修改Generation的计数器并唤醒等待队列中的所有线程。

5. 注意事项

  • 在使用CyclicBarrier时,要确保所有线程都正确调用了await()方法,并且屏障点的线程数量设置正确。
  • 如果在等待过程中发生异常或中断,CyclicBarrier可能会处于不一致状态,因此需要妥善处理异常和中断情况。

四、Phaser(阶段执行器)

1. 简介

Phaser是一种更灵活的同步工具,它提供了对一组线程进行分阶段同步的能力。Phaser允许线程注册、到达、等待和触发不同的阶段,非常适合用于需要动态管理线程阶段执行的场景。

2. 适用场景

Phaser适用于那些需要将线程划分为多个阶段,并在每个阶段结束时执行特定操作的情况。例如,在多阶段任务中,每个阶段可能需要不同的线程数量,且阶段的完成条件可能不同。使用Phaser,可以方便地对这些阶段进行管理和协调。

3. 使用

使用Phaser时,首先需要创建一个Phaser实例,并注册参与线程。然后,在每个阶段,线程可以调用arriveAndAwaitAdvance()方法来表示它们已经完成了当前阶段的工作,并等待其他线程完成。当所有线程都到达当前阶段时,Phaser会触发阶段转换,并允许线程进入下一个阶段。

4. 内部原理及源码解读

内部原理

Phaser内部维护了一个复杂的状态机,包括当前阶段数、已注册的参与者数量、已到达的参与者数量等。每个线程在Phaser中都有一个到达点,当所有线程都到达当前阶段时,Phaser会触发阶段转换,并允许线程进入下一个阶段。

源码解读

Phaser的源码相对复杂,它涉及到了大量的状态和计数器管理。其中,register方法用于注册参与者,arriveAndAwaitAdvance方法用于表示线程到达当前阶段并等待其他线程。在arriveAndAwaitAdvance方法中,会检查当前阶段是否已经完成,如果没有则增加已到达的参与者数量,并可能触发阶段转换。

深入理解Phaser的实现原理,查看和分析其源码是非常有帮助的。由于Phaser的源码较长且复杂,这里我聚焦于其核心机制,而不是完整的实现细节。

public class Phaser {
    // 表示参与者的数量,以及到达的参与者数量等状态信息
    private final AtomicLong state;
    // 用于等待/通知的锁
    private final Object lock;

    // 构造函数,初始化Phaser
    public Phaser() {
        state = new AtomicLong(Phaser.INITIAL_STATE);
        lock = new Object();
    }

    // 注册一个新的参与者,或者为已注册的参与者增加数量
    public void register() {
        // ... 省略具体的实现细节 ...
    }

    // 参与者到达某个阶段,并可能等待其他参与者
    public int arrive() throws InterruptedException {
        // ... 省略具体的实现细节 ...
        return phase;
    }

    // 参与者到达并等待其他参与者,同时推进到下一个阶段
    public int awaitAdvance(int phase) throws InterruptedException {
        // ... 省略具体的实现细节 ...
        return nextPhase;
    }

    // ... 其他方法,如deregister, arriveAndDeregister, bulkRegister, getPhase, getRegisteredParties等 ...

    // 内部状态表示,包含参与者数量和当前阶段等信息
    private static final long UNSET = -1L; // 用于表示未设置的值
    private static final long TERMINATED = Long.MAX_VALUE; // 表示Phaser已经终止
    private static final int MAX_PHASE = Integer.MAX_VALUE; // 最大阶段数
    private static final int PARTIES_MASK = 0xffff; // 参与者数量的掩码
    private static final int PHASE_MASK = ~PARTIES_MASK; // 阶段数的掩码
    private static final long INITIAL_STATE = (UNSET & PHASE_MASK) | (0 & PARTIES_MASK); // 初始状态

    // ... 其他内部方法和变量 ...
}

上面的代码只是一个框架,实际的Phaser实现要复杂得多。不过,通过这个框架,我们可以了解Phaser的一些核心组成部分:

  • 状态维护:Phaser使用一个AtomicLong类型的state变量来维护其内部状态。这个状态包含了当前阶段数、已注册的参与者数量以及已到达的参与者数量等信息。通过使用位操作和掩码,Phaser能够在单个原子变量中高效地存储和更新这些信息。
  • 注册与到达:register()方法用于注册新的参与者或增加已注册参与者的数量。arrive()方法用于表示参与者已经完成了当前阶段的工作,并可能等待其他参与者。这些方法会更新state变量中的相应信息,并根据需要唤醒等待的线程。
  • 等待与推进:awaitAdvance()方法用于等待其他参与者到达当前阶段,并一起进入下一个阶段。这个方法会根据state变量的状态来决定是否需要阻塞调用线程。当所有参与者都到达当前阶段时,Phaser会更新state变量以推进到下一个阶段,并唤醒所有等待的线程。
  • 中断与超时:实际的Phaser实现还支持响应中断和超时。这意味着如果线程在等待过程中被中断或超过指定的等待时间,它可以从等待状态中退出。这些特性是通过在内部使用锁和其他同步机制来实现的。

5. 注意事项

  • 在使用Phaser时,需要确保正确管理线程的注册和注销,避免在阶段转换时出现不一致的情况。
  • Phaser的灵活性也带来了一定的复杂性,因此在使用时需要深入理解其工作原理和使用方法,以避免出现错误或性能问题。

总结

横向对比

以下是以表格形式总结的JDK 8中JUC包中的Semaphore、CountDownLatch、CyclicBarrier和Phaser这四个同步工具类:

工具类

主要用途

内部原理

使用场景

Semaphore

控制访问某个或多个共享资源的线程数量

基于AQS实现,维护一个许可计数器

需要限制并发访问共享资源的场景,如连接池、线程池等

CountDownLatch

允许一个或多个线程等待其他线程完成操作

基于AQS实现,维护一个计数器

用于协调一组线程的执行顺序,例如启动多个线程并行处理任务,并在所有任务完成后执行汇总操作

CyclicBarrier

让一组线程互相等待,直到所有线程都到达某个公共屏障点

使用锁和条件变量实现,维护屏障的周期和计数器

需要一组线程在某个点相互等待的场景,如并行计算中的初始化、数据准备等

Phaser

提供对一组线程进行分阶段同步的能力

维护复杂的状态机,包括阶段数、参与者数量和到达点

适用于需要将线程划分为多个阶段,并在每个阶段结束时执行特定操作的场景,如多阶段任务处理

常见面试题

在面试中,关于JDK 8中JUC包中Semaphore、CountDownLatch、CyclicBarrier和Phaser这四个同步工具类的使用场景,可以提出以下面试题:

Semaphore使用场景面试题:

  • 请描述一个你曾经使用Semaphore解决并发问题的场景。你是如何确定需要的许可数量的?
  • 在高并发环境下,如何使用Semaphore来限制对某个共享资源的访问数量?

CountDownLatch使用场景面试题:

  • 假设你正在开发一个需要等待多个线程完成初始化操作的系统,你会如何使用CountDownLatch来实现?
  • 请分享一个你使用CountDownLatch协调多个线程执行顺序的实例,并解释其工作原理。

CyclicBarrier使用场景面试题:

  • 描述一个适合使用CyclicBarrier的场景,并解释为什么它比其他同步工具类更适合这个场景。
  • 在一个多线程任务中,你需要在所有线程都完成某个阶段后才能进行下一阶段,你会如何使用CyclicBarrier来实现?

Phaser使用场景面试题:

  • 请描述一个需要使用Phaser进行分阶段同步的场景,并解释Phaser在这个场景中的优势。
  • 假设你正在开发一个复杂的多阶段任务,每个阶段需要不同数量的线程来完成,你会如何使用Phaser来管理这些线程的执行?

这些面试题旨在了解候选人对这些同步工具类应用场景的理解以及实际应用经验。通过回答这些问题,候选人可以展示他们对并发编程和JUC工具类的熟悉程度,以及解决实际问题的能力。

这些工具类都提供了灵活的同步机制,可以帮助开发者更好地控制和管理并发程序的执行。根据具体的使用场景和需求,可以选择合适的工具类来实现线程同步和协调。 以上就是JDK 8中JUC包中Semaphore、CountDownLatch、CyclicBarrier和Phaser这四个同步工具类的详细介绍。每个类都有其独特的使用场景和内部原理,了解并正确使用这些工具类,可以大大提高并发编程的效率和稳定性。

欢迎一键三连(关注+点赞+收藏),技术的路上一起加油!!!代码改变世界

  • 关于我:阿里非典型程序员一枚 ,记录平平无奇程序员在大厂的打怪升级之路。 一起学习Java、大数据、数据结构算法(公众号同名


Java8中JUC包同步工具类深度解析(Semaphore,CountDownLatch,CyclicBarrier,Phaser)_工具类_02

举报

相关推荐

0 条评论