Semaphore信号量
一、信号量概念
信号量是操作系统的PV原语在java中的实现,可以指定有限资源个数,线程获取资源,则资源个数减1;线程释放资源,则资源个数加1。信号量为1则类似互斥锁。信号量的主要功能是实现限流,它只允许同时有n个线程同时获取信号量。其加锁解锁逻辑是基于共享锁的实现。
二、常用方法
//构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//获取资源,资源数减1
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//获取指定资源数
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//尝试获取资源,返回true or false,不会阻塞线程
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
//释放资源,资源数加1
public void release() {
sync.releaseShared(1);
}
//释放指定数量的资源
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
三、获取资源、释放资源逻辑分析
获取资源源码分析
以acquire() 方法作为入口
此处如果线程设置中断标志,将在这里抛出中断异常。
进入Semaphore.NonfairSync.tryAcquireShared(int acquires)方法获取资源
自旋获取共享信号量,直到信号量不足或者CAS获取资源成功后退出循环
如果当前尝试信号量,资源数小于0,则进入doAcquireSharedInterruptibly(arg)方法准备进行入队阻塞
进入addWaiter(Node mode)方法进行入队
进入enq(Node node)方法,在这方法里使用了for (;;)循环去保证节点的入队操作。
第一次循环做的操作为红色,即生成一个空节点作为头尾节点;第二次循环即将当前节点入队,并修改前驱后继指针的指向,将插入的节点设置为尾节点。
入队成功后,准备进行阻塞操作。在阻塞操作之前,再次尝试获取资源,如果没获取成功,设置同步阻塞队列的节点的waitStatus,去尝试阻塞。
至此,获取不到信号量的线程进入阻塞状态,直到别的线程释放资源,调用release()方法,再去唤醒同步等待队列里的线程。那就让我们看一看释放资源如何唤醒线程吧
释放资源源码分析
以Semaphore.release()作为入口,调用了sync.releaseShared(1)。
在这个方法中尝试去释放资源。通过CAS替换state的值,CAS成功,即释放信号量成功,执行doReleaseShared()
在阻塞线程之前我们已经将唤醒节点的前趋节点的waitStatus置为-1,此时再CAS前趋节点的waitStatus,置为0;CAS成功进入unparkSuccessor(h) 方法唤醒线程。
**线程唤醒之后,再次准备获取资源,进行CAS操作。**被阻塞的线程继续执行doAcquireSharedInterruptibly(int arg)方法。此时CAS成功,r为信号量,如果r>=0,表示获取资源成功,则进行节点的出队。
并且在出队的过程中,对后续节点判断,mode是否是SHARED,如果是,进行唤醒。此处为共享锁唤醒的特点。
Semaphore的获取,释放资源的源码分析就到此。
四、同步等待队列在获取释放资源的流程图
五、Semaphore使用举例
public class SemaphoreTest {
private Semaphore semaphore = new Semaphore(3);
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public void count() {
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + finalI + "获取信号量");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
});
}
}
public static void main(String[] args) {
SemaphoreTest semaphoreTest = new SemaphoreTest();
semaphoreTest.count();
}
}