目录
注意:本文参考 【Java总结】生产者消费者模型_zhujohnle的专栏-CSDN博客
生产者消费者模型Java实现 - 简书
消费者读取处理慢_Java高并发之Disruptor框架:单线程每秒处理600万订单高并发框架..._weixin_29628611的博客-CSDN博客
生产者消费者模型简介
生产者消费者模型主要结构如下,是一个典型的线程同步的案例。下面就来使用java做几种线程同步的方式来实现以下该模型
确保一个生产者消费者模型的稳定运行的前提有以下几个
生成者应该具备持续生成的能力
消费者应该具备持续消费的能力
生产者的生成和消费消费有一定的阀值,如生成总量到100需要停止生产,通知消费;消费到0的时候停止消费开始生产;
wait,notify方案
wait,notify方案 主要是通过,使用对象的wait方法和notify方法来实现线程的切换执行。其中我们可以看到对象的wait和notify或者notifyAll方法都是调用native的对应的方法来处理,追溯到最后也还是控制cpu进行不同的时间片的切换
下面这个例子比较简单,模拟一个生产速度大于消费速度的这样一个案例,在生产到阀值的时候停止生产通知消费者进行消费(wait)。消费者在消费到一定阀值的时候停止消费通知生产者进行生产(notifyall)
public class TestWaitNotifyConsumerAndProducer {
/*当前生成数量*/
static int currentNum = 0;
/*最大生成数量*/
static int MAX_NUM = 10;
/*最小消费数量*/
static int MIN_NUM = 0;
/*wait和notify控制对象*/
private static final String lock = "lock";
public static void main(String args[]) {
//创建一个生产者
new Thread(new Producer()).start();
//创建两个消费者
new Thread(new Consumer()).start();
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
public void product() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
currentNum++;
System.out.println("Producer now product num:" + currentNum);
lock.notifyAll();
if (currentNum == MAX_NUM) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
@Override
public void run() {
product();
}
}
static class Consumer implements Runnable {
public void consume() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
if (currentNum == MIN_NUM) {
lock.notifyAll();
continue;
}
System.out.println(new StringBuilder(Thread.currentThread().getName())
.append(" Consumer now consumption num:").append(currentNum));
currentNum--;
}
}
}
@Override
public void run() {
consume();
}
}
}
public class ProducerConsumer1 {
class Producer extends Thread {
private String threadName;
private Queue<Goods> queue;
private int maxSize;
public Producer(String threadName, Queue<Goods> queue, int maxSize) {
this.threadName = threadName;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
//模拟生产过程中的耗时操作
Goods goods = new Goods();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out.println("队列已满,【" + threadName + "】进入等待状态");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(goods);
System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
queue.notifyAll();
}
}
}
}
class Consumer extends Thread {
private String threadName;
private Queue<Goods> queue;
public Consumer(String threadName, Queue<Goods> queue) {
this.threadName = threadName;
this.queue = queue;
}
@Override
public void run() {
while (true) {
Goods goods;
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("队列已空,【" + threadName + "】进入等待状态");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
goods = queue.remove();
System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
queue.notifyAll();
}
//模拟消费过程中的耗时操作
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Test
public void test() {
int maxSize = 5;
Queue<Goods> queue = new LinkedList<>();
Thread producer1 = new Producer("生产者1", queue, maxSize);
Thread producer2 = new Producer("生产者2", queue, maxSize);
Thread producer3 = new Producer("生产者3", queue, maxSize);
Thread consumer1 = new Consumer("消费者1", queue);
Thread consumer2 = new Consumer("消费者2", queue);
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
while (true) {
}
}
}
1 确定锁的对象是队列queue
;
2 不要把生产过程和消费过程写在同步块中,这些操作无需同步,同步的仅仅是放入和取出这两个动作;
3 因为是持续生产,持续消费,要用while(true){...}
的方式将【生产、放入】或【取出、消费】的操作都一直进行。
4 但由于是对队列使用synchronized
的方式加锁,同一时刻,要么在放入,要么在取出,两者不能同时进行。
ReentrantLock的实现
ReentrantLock 也是java.util.concurrent中显示锁的一种,允许同一个线程重复进入一段执行代码(递归),并且反复lock加锁。重复加锁相当于计数器累加,因此当一个线程想释放这个锁的时候就需要有对应的unlock执行。
如下这段代码依然是我们开篇讲到的生成和消费的模型,只是换了一种实现;这里特别注意的是可重入锁或者递归锁需要成对的出现lock和unlock否则执行Condition的await 或者signal的时候就可能如下抛出
java.lang.IllegalMonitorStateException
public class TestReentrantLockConsumerAndProducer {
/*当前生成数量*/
static int currentNum = 0;
/*最大生成数量*/
static int MAX_NUM = 10;
/*最小消费数量*/
static int MIN_NUM = 0;
//创建一个锁对象
private static Lock lock = new ReentrantLock();
//缓冲区已空的变量
private static final Condition emptyCondition = lock.newCondition();
//缓冲区已满的变量
private static final Condition fullCondition = lock.newCondition();
public static void main(String args[]) {
//创建一个生产者
new Thread(new Producer()).start();
//创建两个消费者
new Thread(new Consumer()).start();
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
public void product() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
currentNum++;
System.out.println("Producer now product num:" + currentNum);
if (currentNum == MAX_NUM) {
emptyCondition.signal();
try {
fullCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.unlock();
}
}
@Override
public void run() {
product();
}
}
static class Consumer implements Runnable {
public void consume() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
if (currentNum == MIN_NUM) {
fullCondition.signal();
try {
emptyCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
System.out.println(new StringBuilder(Thread.currentThread().getName())
.append(" Consumer now consumption num:").append(currentNum));
currentNum--;
lock.unlock();
}
}
@Override
public void run() {
consume();
}
}
}
public class ProducerConsumer2 {
class Producer extends Thread {
private String threadName;
private Queue<Goods> queue;
private Lock lock;
private Condition notFullCondition;
private Condition notEmptyCondition;
private int maxSize;
public Producer(String threadName, Queue<Goods> queue, Lock lock, Condition notFullCondition, Condition notEmptyCondition, int maxSize) {
this.threadName = threadName;
this.queue = queue;
this.lock = lock;
this.notFullCondition = notFullCondition;
this.notEmptyCondition = notEmptyCondition;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
//模拟生产过程中的耗时操作
Goods goods = new Goods();
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (queue.size() == maxSize) {
try {
System.out.println("队列已满,【" + threadName + "】进入等待状态");
notFullCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(goods);
System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
notEmptyCondition.signalAll();
} finally {
lock.unlock();
}
}
}
}
class Consumer extends Thread {
private String threadName;
private Queue<Goods> queue;
private Lock lock;
private Condition notFullCondition;
private Condition notEmptyCondition;
public Consumer(String threadName, Queue<Goods> queue, Lock lock, Condition notFullCondition, Condition notEmptyCondition) {
this.threadName = threadName;
this.queue = queue;
this.lock = lock;
this.notFullCondition = notFullCondition;
this.notEmptyCondition = notEmptyCondition;
}
@Override
public void run() {
while (true) {
Goods goods;
lock.lock();
try {
while (queue.isEmpty()) {
try {
System.out.println("队列已空,【" + threadName + "】进入等待状态");
notEmptyCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
goods = queue.remove();
System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
notFullCondition.signalAll();
} finally {
lock.unlock();
}
//模拟消费过程中的耗时操作
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Test
public void test() {
int maxSize = 5;
Queue<Goods> queue = new LinkedList<>();
Lock lock = new ReentrantLock();
Condition notEmptyCondition = lock.newCondition();
Condition notFullCondition = lock.newCondition();
Thread producer1 = new ProducerConsumer2.Producer("生产者1", queue, lock, notFullCondition, notEmptyCondition, maxSize);
Thread producer2 = new ProducerConsumer2.Producer("生产者2", queue, lock, notFullCondition, notEmptyCondition, maxSize);
Thread producer3 = new ProducerConsumer2.Producer("生产者3", queue, lock, notFullCondition, notEmptyCondition, maxSize);
Thread consumer1 = new ProducerConsumer2.Consumer("消费者1", queue, lock, notFullCondition, notEmptyCondition);
Thread consumer2 = new ProducerConsumer2.Consumer("消费者2", queue, lock, notFullCondition, notEmptyCondition);
Thread consumer3 = new ProducerConsumer2.Consumer("消费者3", queue, lock, notFullCondition, notEmptyCondition);
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
while (true) {
}
}
}
要注意的地方:
放入和取出操作均是用的同一个锁,所以在同一时刻,要么在放入,要么在取出,两者不能同时进行。因此,与使用wait()和notify()实现类似,这种方式的实现并不能最大限度地利用缓冲区(即例子中的队列)。如果要实现同一时刻,既可以放入又可以取出,则要使用两个重入锁,分别控制放入和取出的操作,具体实现可以参考LinkedBlockingQueue
。
阻塞队列的实现
阻塞队列的实现本质上也还是基于可重入锁,只是进行了进一步的封装,有一个队列的数据结构
这里我们用到了一个数据结构LinkedBlockingDeque的双向队列的结构,当然我们使用任何一个阻塞队列都可以。这里主要用到阻塞队列超过最大容量的时候,自动阻塞等待;
这里使用了几个关键的方法peek,put,peekLast,take 这里大概看看相关源码。 这里源码我们不做过多解读,大概可以看到阻塞队列的内部实现也是依赖于可重入锁ReentrantLock,然后根据put和take的操作,动态的管理锁的获取和释放。
方法/方式处理 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
所以,在这里,要选用put()
和take()
这两个会阻塞的方法。
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}
public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (last == null) ? null : last.item;
} finally {
lock.unlock();
}
}
//take元素的时候执行,可见如果当队列内容为空的时候阻塞
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
/**
* Removes and returns first element, or null if empty.
* 具体删除操作,当操作删除完成以后执行notFull.signal();释放notFull的阻塞
*/
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
/**
* Links node as last element, or returns false if full.
* 当执行成功添加时,释放notEmpty的信号
*/
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
public class TestBlockQueueConsumerAndProducer {
/*最大生成数量*/
static int MAX_NUM = 10;
private static LinkedBlockingDeque mBlockQueue = new LinkedBlockingDeque<Integer>(MAX_NUM);
public static void main(String args[]) {
//创建一个生产者
new Thread(new Producer()).start();
//创建两个消费者
new Thread(new Consumer()).start();
//创建两个消费者
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
public void product() {
while (true) {
if (mBlockQueue.peek() == null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 10; i++) {
try {
mBlockQueue.put(i);
System.out.println("Producer now product num:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
@Override
public void run() {
product();
}
}
static class Consumer implements Runnable {
public void consume() {
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer mLast = (Integer) mBlockQueue.peekLast();
if (mLast != null && mLast == MAX_NUM) {
try {
int num = (Integer) mBlockQueue.take();
System.out.println(new StringBuilder(Thread.currentThread().getName())
.append(" Consumer now consumption num:").append(num));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void run() {
consume();
}
}
}
public class ProducerConsumer3 {
class Producer extends Thread {
private String threadName;
private BlockingQueue<Goods> queue;
public Producer(String threadName, BlockingQueue<Goods> queue) {
this.threadName = threadName;
this.queue = queue;
}
@Override
public void run() {
while (true){
Goods goods = new Goods();
try {
//模拟生产过程中的耗时操作
Thread.sleep(new Random().nextInt(100));
queue.put(goods);
System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread {
private String threadName;
private BlockingQueue<Goods> queue;
public Consumer(String threadName, BlockingQueue<Goods> queue) {
this.threadName = threadName;
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
Goods goods = queue.take();
System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
//模拟消费过程中的耗时操作
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Test
public void test() {
int maxSize = 5;
BlockingQueue<Goods> queue = new LinkedBlockingQueue<>(maxSize);
Thread producer1 = new ProducerConsumer3.Producer("生产者1", queue);
Thread producer2 = new ProducerConsumer3.Producer("生产者2", queue);
Thread producer3 = new ProducerConsumer3.Producer("生产者3", queue);
Thread consumer1 = new ProducerConsumer3.Consumer("消费者1", queue);
Thread consumer2 = new ProducerConsumer3.Consumer("消费者2", queue);
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
while (true) {
}
}
}
如果使用LinkedBlockingQueue作为队列实现,则可以实现:在同一时刻,既可以放入又可以取出,因为LinkedBlockingQueue内部使用了两个重入锁,分别控制取出和放入。
如果使用ArrayBlockingQueue作为队列实现,则在同一时刻只能放入或取出,因为ArrayBlockingQueue内部只使用了一个重入锁来控制并发修改操作。
使用信号量Semaphore实现
前提是熟悉信号量Semaphore
的使用方式,尤其是release()
方法,Semaphore
在release
之前不必一定要先acquire
。
public class ProducerConsumer4 {
class Producer extends Thread {
private String threadName;
private Queue<Goods> queue;
private Semaphore queueSizeSemaphore;
private Semaphore concurrentWriteSemaphore;
private Semaphore notEmptySemaphore;
public Producer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
this.threadName = threadName;
this.queue = queue;
this.concurrentWriteSemaphore = concurrentWriteSemaphore;
this.queueSizeSemaphore = queueSizeSemaphore;
this.notEmptySemaphore = notEmptySemaphore;
}
@Override
public void run() {
while (true) {
//模拟生产过程中的耗时操作
Goods goods = new Goods();
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
queueSizeSemaphore.acquire();//获取队列未满的信号量
concurrentWriteSemaphore.acquire();//获取读写的信号量
queue.add(goods);
System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
concurrentWriteSemaphore.release();
notEmptySemaphore.release();
}
}
}
}
class Consumer extends Thread {
private String threadName;
private Queue<Goods> queue;
private Semaphore queueSizeSemaphore;
private Semaphore concurrentWriteSemaphore;
private Semaphore notEmptySemaphore;
public Consumer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
this.threadName = threadName;
this.queue = queue;
this.concurrentWriteSemaphore = concurrentWriteSemaphore;
this.queueSizeSemaphore = queueSizeSemaphore;
this.notEmptySemaphore = notEmptySemaphore;
}
@Override
public void run() {
while (true) {
Goods goods;
try {
notEmptySemaphore.acquire();
concurrentWriteSemaphore.acquire();
goods = queue.remove();
System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
concurrentWriteSemaphore.release();
queueSizeSemaphore.release();
}
//模拟消费过程中的耗时操作
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Test
public void test() {
int maxSize = 5;
Queue<Goods> queue = new LinkedList<>();
Semaphore concurrentWriteSemaphore = new Semaphore(1);
Semaphore notEmptySemaphore = new Semaphore(0);
Semaphore queueSizeSemaphore = new Semaphore(maxSize);
Thread producer1 = new ProducerConsumer4.Producer("生产者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread producer2 = new ProducerConsumer4.Producer("生产者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread producer3 = new ProducerConsumer4.Producer("生产者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread consumer1 = new ProducerConsumer4.Consumer("消费者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread consumer2 = new ProducerConsumer4.Consumer("消费者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread consumer3 = new ProducerConsumer4.Consumer("消费者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
while (true) {
}
}
}
1 理解代码中的三个信号量的含义
queueSizeSemaphore:(其中的许可证数量,可以理解为队列中可以再放入多少个元素),该信号量的许可证初始数量为仓库大小,即maxSize
;生产者每放置一个商品,则该信号量-1,即执行acquire()
,表示队列中已经添加了一个元素,要减少一个许可证;消费者每取出一个商品,该信号量+1,即执行release()
,表示队列中已经少了一个元素,再给你一个许可证。
notEmptySemaphore:(其中的许可证数量,可以理解为队列中可以取出多少个元素),该信号量的许可证初始数量为0;生产者每放置一个商品,则该信号量+1,即执行release()
,表示队列中添加了一个元素;消费者每取出一个商品,该信号量-1,即执行acquire()
,表示队列中已经少了一个元素,要减少一个许可证;
concurrentWriteSemaphore,相当于一个写锁,在放入或取出商品的时候,都需要先获取再释放许可证。
2 由于实现中,使用了concurrentWriteSemaphore
实现了对队列并发写的控制,在同一时刻,只能对队列进行一种操作:放入或取出。假如把concurrentWriteSemaphore
中的信号量初始化为2或者2以上的值,就会出现多个生产者同时放入或多个消费者同时消费的情况,而使用的LinkedList
是不允许并发进行这种修改的,否则会出现溢出或取空的情况。所以,concurrentWriteSemaphore
只能设置为1,也就导致性能与使用wait() / notify()
方式类似,性能不高。
无锁的缓存框架: Disruptor
BlockingQueue 实现生产者和消费者模式简单易懂,但是BlockingQueue
并不是一个高性能的实现:它完全使用锁和阻塞来实现线程之间的同步。在高并发的场合,它的性能并不是特别的优越。(ConconcurrentLinkedQueue
是一个高性能的队列,但并不没有实现BlockingQueue
接口,即不支持阻塞操作)。
Disruptor是LMAX公司开发的高效的无锁缓存队列。它使用无锁的方式实现了一个环形队列,非常适合于实现生产者和消费者模式,如:事件和消息的发布。
1. 弃用锁机制转而使用CAS。
Disruptor论文中讲述了我们所做的一个实验。这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。当单线程无锁时,程序耗时300ms。如果增加一个锁(仍是单线程、没有竞争、仅仅增加锁),程序需要耗时10000ms,慢了两个数量级。更令人吃惊的是,如果增加一个线程(简单从逻辑上想,应该比单线程加锁快一倍),耗时224000ms。
所以锁的开销实际上已经被证实是非常的大了,如果减少锁的使用,降低锁的粒度,这是disruptor提供给我们的另外一种思路。
2. 为了解决伪共享而引入缓存行填充。
伪共享讲的是多个CPU时的123级缓存的问题,通常,缓存是以缓存行的方式读取数据,如图,当两个CPU同样都把X,Y load到自己的一级缓存时,实际上CPU为了争用X,Y还是会在写入时发生竞争,这样同样会导致锁,效率下降这些问题。
如何解决这个问题呢?实际上使用的是与内存对齐一样的方法,就是每次把数据对齐到跟缓存行(通常是64B)一样大小。其实内存对齐和解决伪共享的缓存行对齐其实是同一种思路。所以在disruptor源码中可以看到这样一个内部类,它为什么写成这样,其实就是为了缓存行对齐。
private static class Padding
{
/** Set to -1 as sequence starting point */
public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;
}
3. 使用一种独特的数据结构RingBuffer来代替Queue。这个数据结构的使用使得弃用锁转而用CAS成为了可能。
大家可以想象,如果要自己编写一段生产者消费者的程序,你会怎么做呢?
当然了,大多数人都会用一个队列来实现,说白了,就是把队列作为生产者和消费者之间的缓冲,从而间接的同步了他们的速度,使得异步编程成为可能。
说到异步编程,可能大家都再熟悉不过了,实际上java的图形界面就是一个典型的异步编程的例子,作为处理界面事件的消费者其实只有一个线程,而更新界面的生产者会是多个线程,由于采用了生产者消费者模型,生产者生产的界面事件会保存在一个队列里,由这个内置的线程统一去做更新,这样就保证了我们编程时不会人为破坏界面绘制的过程,从而提高代码质量,降低复杂度。顺带一提,.net的delegate也是同样的道理,如果用其他线程更新界面会出现很多怪异的问题,所以delegate通过一个事件队列,同样实现了消费者在一个单独的线程中。
扯远了。总之我们会使用一个队列来处理这种问题,这里存在几个问题,第一,如果生产者生产过快,一定时间后,队列会变得过度膨胀,占用内存空间;第二,看过队列的实现会发现,为了保证多个线程访问的正确性,在操作队列时是一定要加锁的,前面也说了,加锁以后时间会慢几个量级。
disruptor框架的设计带我们走出了这个思维定势。我们一定要用不断增长的队列吗?我们访问队列一定要加锁吗?答案都是否定的。
所谓的ringbuffer,就是一个环形队列。我们来看看为了解决这两个问题他是如何做的。
1. 一个环形队列,意味着首尾相连,也就是他的大小是有限制的,但是ringbuffer是基于这样一个假设:即生产者和消费者都在同步往前走,不存在某边特别快,这样这个对列就可以循环使用,不必有创建对象的开销,另外,ringbuffer由于是固定大小的,使用数组来保存,由于预先分配好了空间,所以定位和查找的速度自然不必说。所以它自然的解决了第一个问题。但是如果实际使用中确实是生产者快于消费者(或者反过来)呢?也就是说,在某个时间点,他们一定会首尾相遇,这时候会发生什么呢?这个问题我们随后解释。
2.为了让队列安全的在多线程环境运行,需要整个队列上锁,带来的开销是巨大的。来看ringbuffer是如何无锁的(简化起见,讨论一个生产者一个消费者的情况)。为了解释这个原理,这里借用别人的图来阐述这个问题。比如目前有一个consumer,停留在位置12,这时producer假设在位置3,这时producer的下一步是如何处理的呢?producer会尝试读取4,发现没有到下一个consumer,所以可以安全获取,于是将之改为14,并且将产品publish到14,并且通知一个阻塞的consumer起来活动。如此一直到11都是安全的(这里我们假设生产者比较快),当producer尝试访问12时发现不能继续,于是交出控制权;而consumer开始移动时,会调用barrier的waitFor方法,waitFor看到前面最近的安全节点已经到了20(21是producer),于是直接返回20,所以现在consumer可以无锁的去消费13到20的所有产品,可以想象,这种方式比起synchronized要快上很多倍。
生产者
package cn.lonecloud.procum.disruptor;
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
/**
* @author lonecloud
* @version v1.0
* @date 下午3:02 2018/5/7
*/
public class Producer {
//队列
private final RingBuffer<Data> dataRingBuffer;
public Producer(RingBuffer<Data> dataRingBuffer) {
this.dataRingBuffer = dataRingBuffer;
}
/**
* 插入数据
* @param s
*/
public void pushData(String s) {
//获取下一个位置
long next = dataRingBuffer.next();
try {
//获取容器
Data data = dataRingBuffer.get(next);
//设置数据
data.setData(s);
} finally {
//插入
dataRingBuffer.publish(next);
}
}
}
消费者
package cn.lonecloud.procum.disruptor;
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.WorkHandler;
/**
* @author lonecloud
* @version v1.0
* @date 下午3:01 2018/5/7
*/
public class Customer implements WorkHandler<Data> {
@Override
public void onEvent(Data data) throws Exception {
System.out.println(Thread.currentThread().getName()+"---"+data.getData());
}
}
数据工厂
package cn.lonecloud.procum.disruptor;
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.EventFactory;
/**
* @author lonecloud
* @version v1.0
* @date 下午3:02 2018/5/7
*/
public class DataFactory implements EventFactory<Data> {
@Override
public Data newInstance() {
return new Data();
}
}
主函数
package cn.lonecloud.procum.disruptor;
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lonecloud
* @version v1.0
* @date 下午3:09 2018/5/7
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
//创建线程池
ExecutorService service = Executors.newCachedThreadPool();
//创建数据工厂
DataFactory dataFactory = new DataFactory();
//设置缓冲区大小,必须为2的指数,否则会有异常
int buffersize = 1024;
Disruptor<Data> dataDisruptor = new Disruptor<Data>(dataFactory, buffersize,
service);
//创建消费者线程
dataDisruptor.handleEventsWithWorkerPool(
new Customer(),
new Customer(),
new Customer(),
new Customer(),
new Customer(),
new Customer(),
new Customer()
);
//启动
dataDisruptor.start();
//获取其队列
RingBuffer<Data> ringBuffer = dataDisruptor.getRingBuffer();
for (int i = 0; i < 100; i++) {
//创建生产者
Producer producer = new Producer(ringBuffer);
//设置内容
producer.pushData(UUID.randomUUID().toString());
//Thread.sleep(1000);
}
}
}
其中策略有几种:
1. BlockingWaitStrategy:阻塞策略,最节省CPU,但是高并发条件下性能最糟糕
2 SleepingWaitStrategy:在循环中无限等待,处理数据会产生高延迟,对生产线程影响小,场景:异步日志
3. YieldingWaitStrategy:低延迟场合,使用必须保证剩余的消费者线程的逻辑CPU
4. BusySpinWaitStrategy:消费者线程会尽最大努力疯狂的监控缓冲区变化。