0
点赞
收藏
分享

微信扫一扫

每天学习一点点之 Java 七大阻塞队列之 LinkedBlockingQueue(理解线程安全)


相关文章:

  • ​​Servlet 是单例还是多例的​​

前几天在一个技术群里讨论起了“线程安全”,有同学对线程安全的理解是“描述一个类或者一个库,可以不需要场景,那指的是你随便用不会出问题”,而我个人更倾向于讨论“线程安全”是需要具备前提的,即要看对共有变量的使用场景。当然这种“概念性的问题”本身也不具备很大的讨论意义。由于我在表达自己观点的时候以 ​​LinkedBlockingQueue​​​ 作为自己的论据,本文主要就是谈谈我个人对 ​​LinkedBlockingQueue​​ 中“线程安全”的理解。

研究切入点

还是之前在很多文章中都提到的个人观点,研究源码耗时耗力,需要抓大放小,这样才能“偷懒”去享受生活。所以要找一个切入点,避免过多的陷入在不必要的细节中。

上面也提到了,主要是想谈谈 ​​LinkedBlockingQueue​​​ 中的“线程安全”,那么在分析 ​​LinkedBlockingQueue​​​ 的线程安全的时候,自然共有变量就是 ​​LinkedBlockingQueue​​​,队列我们常用的操作是入队和出队,但也有其他的操作,比如查队列的容量和遍历。所以我们就研究 ​​LinkedBlockingQueue​​ 的四个操作:写、取、查询容量、遍历,后面两个操作可以统归为查询类操作。

锁分析

提到线程安全,一般都会想到锁(当然也存在“无锁”和特殊的设计去保证线程安全),那么就直奔主题,看看 ​​LinkedBlockingQueue​​ 中有没有用到锁或者类似锁的机制。

很容易就能找到:

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

也能够看到 ​​LinkedBlockingQueue​​​ 中用到了两把非公平锁,即“写锁”和“取锁”。这里大概就能猜到(一般来说)​​LinkedBlockingQueue​​​ 是支持同时写取的。而 ​​ArrayBlockingQueue​​ 只有一把锁:

/** Main lock guarding all access */
final ReentrantLock lock;

所以 ​​ArrayBlockingQueue​​​ 是不能同时写取的,这也是 ​​ArrayBlockingQueue​​​ 和 ​​LinkedBlockingQueue​​ 的一个区别。

写操作

其实队列的写方法也有好几个:

Throws exception

Special value

Blocks

Times out

Insert

​​add(e)​​

​​offer(e)​​

​​put(e)​​

​​offer(e, time, unit)​​

Remove

​​remove()​​

​​poll()​​

​​take()​​

​​poll(time, unit)​​

Examine

​​element()​​

​​peek()​​

not applicable

not applicable

那么到底从哪一个入手呢,即然是阻塞队列,那么肯定研究有阻塞的 ​​put​​ 方法了。

写安全

​put​​ 方法如下:

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
//获取取锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可中断加锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

可以看到 ​​put​​ 方法一开始就加锁了,就已经保证了同一时间只能有一个线程进行写操作。

写操作的阻塞和唤醒

写操作的阻塞有两种情况:

  1. 写写互斥
  2. 队列满了,写阻塞

可以看到 ​​putLock​​​ 有一个 ​​Condition​​​:​​notFull​​​,对于 ​​Condition​​​,有 ​​await​​​ 就肯定有 ​​signal​​​。再看 ​​put​​ 方法:

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
//获取取锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可中断加锁,加锁本身就有锁持有阻塞
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果容量已经满了,就会阻塞
while (count.get() == capacity) {
notFull.await(); //释放锁
}
enqueue(node);
c = count.getAndIncrement();
//要注意,此时c+1是当前队列的容量,如果当前队列没有满,那么就会唤醒其他的put阻塞线程,put不是只能被动的被take去唤醒,也可以自己唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

队列满了 ​​put​​​ 会阻塞,那 ​​take​​​ 的时候肯定也会去唤醒 ​​put​​​ 的阻塞。再看 ​​take​​ 方法:

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取取锁
final ReentrantLock takeLock = this.takeLock;
//可中断加锁
takeLock.lockInterruptibly();
try {
//如果队列是空的,阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//如果队列不是空的,唤醒take阻塞线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//这里的c是出队之前的数量,即take操作之前的数量,也就是说如果take之前队列是满的,take后就就唤醒put阻塞的线程
if (c == capacity)
signalNotFull();
return x;
}

里面调用了 ​​signalNotFull​​​ 方法。即 ​​take​​​ 操作前,如果队列是满的,那么就会在 ​​take​​​ 操作后唤醒 ​​put​​ 阻塞的线程。

取操作

取安全

取方法自然就要选择 ​​take​​ :

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取取锁
final ReentrantLock takeLock = this.takeLock;
//可中断加锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//c是元素数量,即如果
if (c == capacity)
signalNotFull();
return x;
}

可以看到 ​​take​​ 方法一开始就加锁了,就已经保证了同一时间只能有一个线程进行取操作。

取操作的阻塞和唤醒

同写操作一样,取操作的阻塞有两种情况:

  1. 取取互斥
  2. 队列空了,取阻塞

​take​​ 方法如下:

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取取锁
final ReentrantLock takeLock = this.takeLock;
//可中断加锁
takeLock.lockInterruptibly();
try {
//如果队列是空的,阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//如果队列不是空的,唤醒take阻塞线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//这里的c是出队之前的数量,即take操作之前的数量,也就是说如果take之前队列是满的,take后就就唤醒put阻塞的线程
if (c == capacity)
signalNotFull();
return x;
}

队列空了 ​​take​​​ 会阻塞,那 ​​put​​​ 的时候肯定也会去唤醒 ​​take​​​ 的阻塞。再看 ​​put​​ 方法:

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
//获取取锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可中断加锁,加锁本身就有锁持有阻塞
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果容量已经满了,就会阻塞
while (count.get() == capacity) {
notFull.await(); //释放锁
}
enqueue(node);
c = count.getAndIncrement();
//要注意,此时c+1是当前队列的容量,如果当前队列没有满,那么就会唤醒其他的put阻塞线程,put不是只能被动的被take去唤醒,也可以自己唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//这里c是put前的数量,put前队列是空的,那么put后就去唤醒take的阻塞线程
if (c == 0)
signalNotEmpty();
}

查询操作

查询操作这里分成两类方法讨论:

  1. 查询容量
  2. 遍历

查询容量操作

直接看 ​​remainingCapacity​​ 方法:

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
return capacity - count.get();
}

可以发现这个方法没有加任何锁。注释也有说明,你不能通过这个方法来判断是否写元素成功。说白了,这个方法返回的是一个“可能”准确的值。比如如果当前使用场景只能想“大概”知道队列容量,如线程池的监控,那么这个方法也是可以使用的。

遍历操作

直接看 ​​iterator​​ 方法:

/**
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator() {
return new Itr();
}

private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/

private Node<E> current;
private Node<E> lastRet;
private E currentElement;

Itr() {
fullyLock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
...
...
}

这里可以看到一个很有意思的方法:​​fullyLock​​:

/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}

也就是说在遍历的时候,写锁和取锁我都给你锁住了,这样可以保证遍历期间的数据是完全准确的。

总结

本文简要分析了 ​​LinkedBlockingQueue​​​ 的写、取、容量查询、遍历四种操作的源码,发现写和取作为更新类操作,​​LinkedBlockingQueue​​ 是可以保证线程安全的,但是查询容量在多线程环境下返回的结果并不一定可靠,而遍历操作同样作为查询类操作却是可以保证遍历期间数据的准确性。

本文也是基于对 ​​LinkedBlockingQueue​​​ 的写、取、查询三类操作的源码分析表达一个个人观点:我们讨论线程安全于不安全的时候,一定要注意共有变量的使用场景,忽略这个场景去讨论所谓的线程安全与不安全是没有意义的,就好比 ​​SynchronizedList​​​ 和 ​​CopyOnWriteArrayList​​​ 我们总会说它们是“线程安全”的 ​​List​​:

每天学习一点点之 Java 七大阻塞队列之 LinkedBlockingQueue(理解线程安全)_ci

但真的在任何场景下它们的“线程安全”都是一样的嘛?

为什么一直在强调“使用场景“呢,因为大多数场景下我们会存在“取舍”,即某些操作我需要“强制”线程安全,确保数据的准确性,但是某些操作呢,不完全准确我也能接收,有点类似于分布式系统中 CAP 和 BASE 的讨论。

废话了一整篇,那么当面试的时候被问到”​​HashMap​​ 是线程安全的吗?“、”Spring Controller 是线程安全的吗“、“Servlet 是线程安全的 吗”的时候,你觉得怎么回答比较好呢?

References

  • ​​https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/BlockingQueue.html​​

每天学习一点点之 Java 七大阻塞队列之 LinkedBlockingQueue(理解线程安全)_加锁_02


举报

相关推荐

0 条评论