0
点赞
收藏
分享

微信扫一扫

【JUC系列】LOCK框架系列之四 同步工具类Semaphore

人间四月天i 2022-04-27 阅读 67
java

Semaphore

文章目录


一种并发流程控制的工具类。

用来控制访问特定资源的线程数量,通过协调各个线程,保证合理利用资源。

主要成员

成员变量

/*同步队列*/
private final Sync sync;

内部类

Sync

Sync继承了AQS,并实现了共享锁的获取与释放相关方法。同步控制依赖AQS。详细见AQS的分析.

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        
        // 构造器
        Sync(int permits) {
            setState(permits);
        }
        
        // 获取同步状态值
        final int getPermits() {
            return getState();
        }

        // 非公平式获取共享锁
        final int nonfairTryAcquireShared(int acquires) {
            // 无限循环
            for (;;) {
                // 获取当前同步状态值--可用锁的数量
                int available = getState();
                int remaining = available - acquires;
                // 当CAS操作成功或剩余锁不足的时候,返回退出
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        // 释放共享锁
        protected final boolean tryReleaseShared(int releases) {
            // 无限循环
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 减少许可
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        
        // 一次性去取出所有可用的锁
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

NonfairSync

继承Sync,提供非公平式获取锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        // 构造器,定义锁的数量
        NonfairSync(int permits) {
            super(permits);
        }
        // 非公平式获取共享锁
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

FairSync

继承Sync,提供公平式获取锁

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        // 构造器,定义锁的数量
        FairSync(int permits) {
            super(permits);
        }

        // 尝试获取锁
        protected int tryAcquireShared(int acquires) {
            // 无限循环
            for (;;) {
                // 同步队列中有其他结点。为了实现公平,如果队列中有比当前线程更早的线程在等待锁,当前线程就不去竞争锁了,直接退出
                if (hasQueuedPredecessors())
                    return -1;
                // 获取可用锁
                int available = getState();
                int remaining = available - acquires;
                // 当CAS操作成功或剩余锁不足的时候,返回退出
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

核心方法

方法名描述
void acquire() throws InterruptedException申请一个许可(减少信号量),线程会被阻塞直到申请到信号量或者当前线程被中断
void acquireUninterruptibly()申请一个许可(减少信号量),线程会被阻塞直到申请到信号量。如果当前线程在等待许可时被中断,那么它将继续等待,但线程被分配许可的时间可能会与它在没有中断发生时收到许可的时间相比发生变化。 当线程确实从此方法返回时,将设置其中断状态。
boolean tryAcquire()以自旋方式尝试获取申请一个许可(减少信号量),方法不会阻塞线程,true代表获取成功,false代表失败。
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException尝试获取申请一个许可(减少信号量),true代表获取成功,false代表失败。在线程阻塞一段时间后,会自动唤醒,对中断敏感。
void release()释放一个许可(增加信号量)
void acquire(int permits) throws InterruptedException申请一定数量的许可(permits参数值),线程会被阻塞直到申请到对应数量的信号量或者被当前线程被中断
void acquireUninterruptibly(int permits)申请一定数量的许可(permits参数值),线程会被阻塞直到申请到对应数量的信号量。如果当前线程在等待许可时被中断,那么它将继续等待,但线程被分配许可的时间可能会与它在没有中断发生时收到许可的时间相比发生变化。 当线程确实从此方法返回时,将设置其中断状态。
boolean tryAcquire(int permits)以自旋方式尝试获取申请一个许可(减少信号量),方法不会阻塞线程,true代表获取成功,false代表失败。
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException尝试获取申请一定数量的许可(减少信号量),true代表获取成功,false代表失败。在线程阻塞一段时间后,会自动唤醒,对中断敏感。
void release(int permits)释放一定数量的许可(增加信号量)
int availablePermits()返回可用数量的许可
int drainPermits()申请剩余的所有的许可,返回值为可用许可数量
void reducePermits(int reduction)减少一定数据量(参数值reduction)的许可
boolean isFair()是否是公平模式 true代表公平,false代表非公平
final boolean hasQueuedThreads()是否存在同步队列,true代表有,false代表没有
final int getQueueLength()查询同步队列的长度

示例

在获取项目之前,每个线程必须从信号量中获取许可,以保证项目可供使用。 当线程完成该项目时,它会返回到池中,并向信号量返回一个许可,允许另一个线程获取该项目。 请注意,调用 acquire() 时不会持有同步锁,因为这会阻止项目返回到池中。 信号量封装了限制对池的访问所需的同步,与维护池本身一致性所需的任何同步分开。

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Pool {

    /**
     * 最大可用线程数,也是资源的数量
     */
    private static final int MAX_AVAILABLE = 5;
    /**
     * 采用公平模式的信号量
     */
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    /**
     * 获取元素
     *
     */
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * 放入元素
     *
     */
    public void putItem(Object x) {
        if (markAsUnused(x)) {
            available.release();
        }
    }

    // 不是特别有效的数据结构;仅用于演示

    // item代表资源-也是5个元素
    protected Object[] items = new Object[]{"A", "B", "C", "D", "E"};
    // 各下标的与item中的元素对应,用来标明item中元素是否可以被操作,true代表元素正在被使用,false代表未被使用。
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    /**
    * 获取下一个可以被使用的元素
    */
    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null; // not reached
    }

    /**
     * 将元素变为未被使用
     *
     */
    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else {
                    return false;
                }
            }
        }
        return false;
    }


    static class Worker implements Runnable {
        private final Pool pool;

        Worker(Pool pool) {
            this.pool = pool;
        }

        @Override
        public void run() {
            try {
                Object object = pool.getItem();
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "]" + Thread.currentThread().getName() + ": get item is " + object + ", availablePermits is " + pool.available.availablePermits() + " is start. ");
                // 休眠2s,放大程序执行时间
                Thread.sleep(2000);
                pool.putItem(object);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "]" + Thread.currentThread().getName() + ": get item is " + object + ", availablePermits is " + pool.available.availablePermits() + " is done. ");

            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "is interrupted");
            }
        }
    }

    public static void main(String[] args) {
        Pool pool = new Pool();

        ExecutorService e = Executors.newFixedThreadPool(15);
        // 新建15个线程去执行
        for (int i = 0; i < 15; i++) {
            e.execute(new Worker(pool));
        }
        e.shutdown();
    }
}

执行结果。可以看出15个线程分成了3批去执行。

[20:40:39] pool-1-thread-3: get item is C, availablePermits is 0 is start. 
[20:40:39] pool-1-thread-1: get item is A, availablePermits is 0 is start. 
[20:40:39] pool-1-thread-2: get item is B, availablePermits is 0 is start. 
[20:40:39] pool-1-thread-5: get item is E, availablePermits is 0 is start. 
[20:40:39] pool-1-thread-4: get item is D, availablePermits is 0 is start. 
[20:40:41] pool-1-thread-6: get item is A, availablePermits is 0 is start. 
[20:40:41] pool-1-thread-3: get item is C, availablePermits is 0 is done. 
[20:40:41] pool-1-thread-8: get item is C, availablePermits is 0 is start. 
[20:40:41] pool-1-thread-7: get item is B, availablePermits is 0 is start. 
[20:40:41] pool-1-thread-2: get item is B, availablePermits is 0 is done. 
[20:40:41] pool-1-thread-9: get item is E, availablePermits is 0 is start. 
[20:40:41] pool-1-thread-1: get item is A, availablePermits is 0 is done. 
[20:40:41] pool-1-thread-4: get item is D, availablePermits is 0 is done. 
[20:40:41] pool-1-thread-5: get item is E, availablePermits is 0 is done. 
[20:40:41] pool-1-thread-10: get item is D, availablePermits is 0 is start. 
[20:40:43] pool-1-thread-11: get item is A, availablePermits is 0 is start. 
[20:40:43] pool-1-thread-12: get item is B, availablePermits is 0 is start. 
[20:40:43] pool-1-thread-13: get item is C, availablePermits is 0 is start. 
[20:40:43] pool-1-thread-6: get item is A, availablePermits is 0 is done. 
[20:40:43] pool-1-thread-7: get item is B, availablePermits is 0 is done. 
[20:40:43] pool-1-thread-8: get item is C, availablePermits is 0 is done. 
[20:40:43] pool-1-thread-15: get item is D, availablePermits is 0 is start. 
[20:40:43] pool-1-thread-14: get item is E, availablePermits is 0 is start. 
[20:40:43] pool-1-thread-10: get item is D, availablePermits is 0 is done. 
[20:40:43] pool-1-thread-9: get item is E, availablePermits is 0 is done. 
[20:40:45] pool-1-thread-12: get item is B, availablePermits is 2 is done. 
[20:40:45] pool-1-thread-13: get item is C, availablePermits is 3 is done. 
[20:40:45] pool-1-thread-11: get item is A, availablePermits is 3 is done. 
[20:40:45] pool-1-thread-14: get item is E, availablePermits is 5 is done. 
[20:40:45] pool-1-thread-15: get item is D, availablePermits is 5 is done. 

如果只定义了1个信号量,那么会退化成排他锁。

源码分析

举报

相关推荐

0 条评论