0
点赞
收藏
分享

微信扫一扫

Semaphore 信号量基本使用以及核心源码解析


概述

Semaphore,俗称信号量,它是操作系统中 ​​PV​​操作的原语在 java 的实现,它也是基于AbstractQueuedSynchronizer 实现的。Semaphore 的功能非常强大,大小为 1 的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

运用

使用场景:

限流场景,比如我一个景区,在高峰时期最多容纳 2000 人,如果一天有 2030 想去进去,我们需要等前面的 2000 人出来一个人,才能放另外的其他人进去。

Semaphore 信号量基本使用以及核心源码解析_限流

测试代码:

@Slf4j
public class SemaphoreTest {

// 线程池
private static ExecutorService executorService
= Executors.newFixedThreadPool(10);
// 限制只有 5 个线程能够同时获取锁
private static Semaphore semaphore = new Semaphore(5);

public static void main(String[] args) throws InterruptedException {
for (; ; ) {
Thread.sleep(100);
executorService.submit(() -> exec());
}
}

public static void exec() {
try {
semaphore.acquire(1);
log.info("执行 exec 方法");
TimeUnit.SECONDS.sleep(2);
} catch (Throwable e) {
e.printStackTrace();
} finally {
semaphore.release(1);
}
}
}

执行效果

Semaphore 信号量基本使用以及核心源码解析_后端_02

我们从执行结果上来看,2秒钟之内,只有 5 个执行中的线程。

源码

我们先看看,​​Semaphore​​为我们提供了那些方法可以供我们使用

构造方法

它提供了两个构造方法:​​Semaphore(int permits)​​​和 ​​Semaphore(int permits, boolean fair)​

两个参数解释:

  • permits 表示许可证的数量(共享资源数)
  • fair 表示是否为公平锁(默认非公平)

Semaphore 信号量基本使用以及核心源码解析_信号量_03

其他方法

  • ​acquire()​​表示阻塞并获取许可tryAcquire()方法在没有许可的情况下会立即返回false,要获取许可的线程不会阻塞release()表示释放许可;
  • ​intavailablePermits()​​:返回此信号量中当前可用的许可证数;
  • ​intgetQueueLength()​​:返回正在等待获取许可证的线程数;
  • ​booleanhasQueuedThreads()​​:是否有线程正在等待获取许可证;
  • ​voidreducePermit(intreduction)​​:减少 reduction个许可证
  • ​Collection#getQueuedThreads()​​:返回所有等待获取许可证的线程集合

原理

Semaphore 的本质是 AQS 的共享锁实现,我们先看看它的构造方法、获取信号、释放信号的逻辑。

获取许可

以扇面的代码为例:

semaphore.acquire(1);

Semaphore 信号量基本使用以及核心源码解析_后端_04

最终会调用,(注意:默认非公平)

final int nonfairTryAcquireShared(int acquires) {
// 循环
for (;;) {
// 获取 state
int available = getState();
// 剩余资源数 = 有效许可 - 当前请求许可
int remaining = available - acquires;
// 剩余资源数 < 0 || 或者修改成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

如果获取锁失败,那么就执行 ​​doAcquireSharedInterruptibly(arg);​​入队。

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
if (p == head) {
// 如果是队列头节点,再次尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 清除无用的节点并且挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 获取失败并且移除节点
cancelAcquire(node);
}
}

释放许可

释放许可的方法为 ​​release​​, 其源码如下:

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
// 成功
return true;
}
// 失败
return false;
}

​tryReleaseShared​​代码如下:

protected final boolean tryReleaseShared(int releases) {
// 自旋
for (;;) {
// 获取 state
int current = getState();
// 机上需要释放的信号量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// cas 修改 state
if (compareAndSetState(current, next))
return true;
}
}

​doReleaseShared​​代码如下:

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 循环
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒下一个节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

总结

Semaphore 是在 AQS 基础之上实现的一个工具,Semaphore 的剩余许可量是通过 AQS 中的 state 属性进行的记录,获取许可是将该值进行减少,释放许可是将该值进行增加,当没有足够的许可时,线程会加入到阻塞队列中等待其他线程释放许可并唤醒。

举报

相关推荐

0 条评论