0
点赞
收藏
分享

微信扫一扫

并发编程--CountDownLatch


CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。

简单示例:

public class CountDownLatchLearn {

private static int LATCH_SIZE = 5;
private static CountDownLatch doneSignal;
public static void main(String[] args) {
try {
doneSignal = new CountDownLatch(LATCH_SIZE);
// 新建5个任务
for(int i=0; i<LATCH_SIZE; i++)
new InnerThread().start();

System.out.println("main await begin.");
// "主线程"等待线程池中5个任务的完成
doneSignal.await();

System.out.println("main await finished.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class InnerThread extends Thread{
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
// 将CountDownLatch的数值减1
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

说明:创建等待线程数为5,当主线程Main运行到doneSignal.wait()时会阻塞当前线程,直到另外5个线程执行完成之后主线程才会继续执行。

1、构造函数:设置锁标识state的值

CountDownLatch countDownLatch = new  CountDownLatch(5);//实现的操作是设置锁标识state的值为5

2、countDownLatch.await()

调用await的实现是如果state的值不等于0,表示还有其他线程没有执行完(其他线程执行完之后会将state减一操作),此时主线程处于阻塞状态

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

这里acquireSharedInterruptibly会进行state状态判断

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //tryAcquireShared函数用来判断state的值是否等于0
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared中的操作是判断锁标识位state是否等于0,如果不等于0,则调用doAcquireSharedInterruptibly函数,阻塞线程。

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; //判断锁标识位state是否等于0,在构造函数时会给state赋值
}

doAcquireSharedInterruptibly(arg)操作的判断是将当前线程放到FIFO队列中,并将线程阻塞。

//会将线程添加到FIFO队列中,并阻塞
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将线程添加到FIFO队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//parkAndCheckInterrupt完成线程的阻塞操作
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3、countDownLatch.countDown();操作是将锁标识位state进行减一操作,如果state此时减一之后为0时则唤起被阻塞线程。

public void countDown() {
sync.releaseShared(1); //将state值进行减一操作
}

releaseShared中完成的操作是将锁标识位state进行减一操作,如果state此时减一之后为0时则唤起被阻塞线程


public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//将锁标识位state进行减arg操作
doReleaseShared();//唤起阻塞线程操作
return true;
}
return false;
}

在tryReleaseShared中会完成state的减值操作。

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取state值
int c = getState();
if (c == 0)
return false;
//进行减一操作
int nextc = c-1;
//cas操作完成state值的修改
if (compareAndSetState(c, nextc))
//如果nextc等于0则返回
return nextc == 0;
}
}

doReleaseShared完成阻塞线程的唤起操作

private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//完成阻塞线程的唤起操作
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

总结:通过CountDownLatch完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待



CountDownLatch源码:

public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

//设置state值
Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}
//判断state值是否等于0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//将state锁标识位进行减一操作
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

//count值代表执行等待的次数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

//当count小于1时,wait线程才会继续执行,不然阻塞
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}



public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//每调用一次count值减1
public void countDown() {
sync.releaseShared(1);
}

public long getCount() {
return sync.getCount();
}


public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}



举报

相关推荐

0 条评论