0
点赞
收藏
分享

微信扫一扫

java多线程 生产者消费者模型总结

sullay 2022-02-18 阅读 67

目录

生产者消费者模型简介

wait,notify方案

ReentrantLock的实现

阻塞队列的实现

使用信号量Semaphore实现

无锁的缓存框架: Disruptor


注意:本文参考  【Java总结】生产者消费者模型_zhujohnle的专栏-CSDN博客

生产者消费者模型Java实现 - 简书

消费者读取处理慢_Java高并发之Disruptor框架:单线程每秒处理600万订单高并发框架..._weixin_29628611的博客-CSDN博客

生产者消费者模型简介

生产者消费者模型主要结构如下,是一个典型的线程同步的案例。下面就来使用java做几种线程同步的方式来实现以下该模型

确保一个生产者消费者模型的稳定运行的前提有以下几个

生成者应该具备持续生成的能力

消费者应该具备持续消费的能力

生产者的生成和消费消费有一定的阀值,如生成总量到100需要停止生产,通知消费;消费到0的时候停止消费开始生产;

wait,notify方案

wait,notify方案 主要是通过,使用对象的wait方法和notify方法来实现线程的切换执行。其中我们可以看到对象的wait和notify或者notifyAll方法都是调用native的对应的方法来处理,追溯到最后也还是控制cpu进行不同的时间片的切换

下面这个例子比较简单,模拟一个生产速度大于消费速度的这样一个案例,在生产到阀值的时候停止生产通知消费者进行消费(wait)。消费者在消费到一定阀值的时候停止消费通知生产者进行生产(notifyall)

public class TestWaitNotifyConsumerAndProducer {

	/*当前生成数量*/
	static int currentNum = 0;
	/*最大生成数量*/
	static int MAX_NUM = 10;
	/*最小消费数量*/
	static int MIN_NUM = 0;
	/*wait和notify控制对象*/
	private static final String lock = "lock";

	public static void main(String args[]) {
		//创建一个生产者
		new Thread(new Producer()).start();

		//创建两个消费者
		new Thread(new Consumer()).start();
		new Thread(new Consumer()).start();
	}

	
	static class Producer implements Runnable {
		public void product() {
			while (true) {
				try {
					Thread.sleep(10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

				synchronized (lock) {
					currentNum++;
					System.out.println("Producer now product num:" + currentNum);
					lock.notifyAll();

					if (currentNum == MAX_NUM) {
						try {
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}

			}
		}

		@Override
		public void run() {
			product();
		}
	}

	static class Consumer implements Runnable {
		public void consume() {
			while (true) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				synchronized (lock) {
					if (currentNum == MIN_NUM) {
						lock.notifyAll();
						continue;
					}
					System.out.println(new StringBuilder(Thread.currentThread().getName())
							.append(" Consumer now consumption num:").append(currentNum));
					currentNum--;
				}
			}
		}

		@Override
		public void run() {
			consume();
		}
	}
}
public class ProducerConsumer1 {

    class Producer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private int maxSize;

        public Producer(String threadName, Queue<Goods> queue, int maxSize) {
            this.threadName = threadName;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run() {
            while (true) {
                //模拟生产过程中的耗时操作
                Goods goods = new Goods();
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (queue) {
                    while (queue.size() == maxSize) {
                        try {
                            System.out.println("队列已满,【" + threadName + "】进入等待状态");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    queue.add(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    queue.notifyAll();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private Queue<Goods> queue;

        public Consumer(String threadName, Queue<Goods> queue) {
            this.threadName = threadName;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                Goods goods;
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            System.out.println("队列已空,【" + threadName + "】进入等待状态");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    goods = queue.remove();
                    System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    queue.notifyAll();
                }
                //模拟消费过程中的耗时操作
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();

        Thread producer1 = new Producer("生产者1", queue, maxSize);
        Thread producer2 = new Producer("生产者2", queue, maxSize);
        Thread producer3 = new Producer("生产者3", queue, maxSize);

        Thread consumer1 = new Consumer("消费者1", queue);
        Thread consumer2 = new Consumer("消费者2", queue);


        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();

        while (true) {

        }
    }
}

1 确定锁的对象是队列queue

2 不要把生产过程和消费过程写在同步块中,这些操作无需同步,同步的仅仅是放入和取出这两个动作;

3 因为是持续生产,持续消费,要用while(true){...}的方式将【生产、放入】或【取出、消费】的操作都一直进行。

4 但由于是对队列使用synchronized的方式加锁,同一时刻,要么在放入,要么在取出,两者不能同时进行。

ReentrantLock的实现

ReentrantLock 也是java.util.concurrent中显示锁的一种,允许同一个线程重复进入一段执行代码(递归),并且反复lock加锁。重复加锁相当于计数器累加,因此当一个线程想释放这个锁的时候就需要有对应的unlock执行。

如下这段代码依然是我们开篇讲到的生成和消费的模型,只是换了一种实现;这里特别注意的是可重入锁或者递归锁需要成对的出现lock和unlock否则执行Condition的await 或者signal的时候就可能如下抛出

java.lang.IllegalMonitorStateException

public class TestReentrantLockConsumerAndProducer {

	/*当前生成数量*/
	static int currentNum = 0;
	/*最大生成数量*/
	static int MAX_NUM = 10;
	/*最小消费数量*/
	static int MIN_NUM = 0;
	//创建一个锁对象
	private static Lock lock = new ReentrantLock();
	//缓冲区已空的变量
	private static final Condition emptyCondition = lock.newCondition();
	//缓冲区已满的变量
	private static final Condition fullCondition = lock.newCondition();

	public static void main(String args[]) {
		//创建一个生产者
		new Thread(new Producer()).start();

		//创建两个消费者
		new Thread(new Consumer()).start();
		new Thread(new Consumer()).start();
	}


	static class Producer implements Runnable {
		public void product() {
			while (true) {
				try {
					Thread.sleep(10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				lock.lock();
				currentNum++;
				System.out.println("Producer now product num:" + currentNum);
				if (currentNum == MAX_NUM) {
					emptyCondition.signal();
					try {
						fullCondition.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				lock.unlock();

			}
		}

		@Override
		public void run() {
			product();
		}
	}

	static class Consumer implements Runnable {
		public void consume() {
			while (true) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				lock.lock();
				if (currentNum == MIN_NUM) {
					fullCondition.signal();

					try {
						emptyCondition.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					continue;
				}
				System.out.println(new StringBuilder(Thread.currentThread().getName())
						.append(" Consumer now consumption num:").append(currentNum));
				currentNum--;

				lock.unlock();
			}
		}

		@Override
		public void run() {
			consume();
		}
	}
}

public class ProducerConsumer2 {

    class Producer extends Thread {

        private String threadName;
        private Queue<Goods> queue;
        private Lock lock;
        private Condition notFullCondition;
        private Condition notEmptyCondition;
        private int maxSize;

        public Producer(String threadName, Queue<Goods> queue, Lock lock, Condition notFullCondition, Condition notEmptyCondition, int maxSize) {
            this.threadName = threadName;
            this.queue = queue;
            this.lock = lock;
            this.notFullCondition = notFullCondition;
            this.notEmptyCondition = notEmptyCondition;
            this.maxSize = maxSize;

        }

        @Override
        public void run() {
            while (true) {
                //模拟生产过程中的耗时操作
                Goods goods = new Goods();
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock.lock();
                try {
                    while (queue.size() == maxSize) {
                        try {
                            System.out.println("队列已满,【" + threadName + "】进入等待状态");
                            notFullCondition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    queue.add(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    notEmptyCondition.signalAll();

                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private Lock lock;
        private Condition notFullCondition;
        private Condition notEmptyCondition;

        public Consumer(String threadName, Queue<Goods> queue, Lock lock, Condition notFullCondition, Condition notEmptyCondition) {
            this.threadName = threadName;
            this.queue = queue;
            this.lock = lock;
            this.notFullCondition = notFullCondition;
            this.notEmptyCondition = notEmptyCondition;
        }

        @Override
        public void run() {
            while (true) {
                Goods goods;
                lock.lock();
                try {
                    while (queue.isEmpty()) {
                        try {
                            System.out.println("队列已空,【" + threadName + "】进入等待状态");
                            notEmptyCondition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    goods = queue.remove();
                    System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    notFullCondition.signalAll();

                } finally {
                    lock.unlock();
                }

                //模拟消费过程中的耗时操作
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();
        Lock lock = new ReentrantLock();
        Condition notEmptyCondition = lock.newCondition();
        Condition notFullCondition = lock.newCondition();

        Thread producer1 = new ProducerConsumer2.Producer("生产者1", queue, lock, notFullCondition, notEmptyCondition, maxSize);
        Thread producer2 = new ProducerConsumer2.Producer("生产者2", queue, lock, notFullCondition, notEmptyCondition, maxSize);
        Thread producer3 = new ProducerConsumer2.Producer("生产者3", queue, lock, notFullCondition, notEmptyCondition, maxSize);

        Thread consumer1 = new ProducerConsumer2.Consumer("消费者1", queue, lock, notFullCondition, notEmptyCondition);
        Thread consumer2 = new ProducerConsumer2.Consumer("消费者2", queue, lock, notFullCondition, notEmptyCondition);
        Thread consumer3 = new ProducerConsumer2.Consumer("消费者3", queue, lock, notFullCondition, notEmptyCondition);


        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        while (true) {

        }
    }
}

要注意的地方:

放入和取出操作均是用的同一个锁,所以在同一时刻,要么在放入,要么在取出,两者不能同时进行。因此,与使用wait()和notify()实现类似,这种方式的实现并不能最大限度地利用缓冲区(即例子中的队列)。如果要实现同一时刻,既可以放入又可以取出,则要使用两个重入锁,分别控制放入和取出的操作,具体实现可以参考LinkedBlockingQueue

阻塞队列的实现

阻塞队列的实现本质上也还是基于可重入锁,只是进行了进一步的封装,有一个队列的数据结构

这里我们用到了一个数据结构LinkedBlockingDeque的双向队列的结构,当然我们使用任何一个阻塞队列都可以。这里主要用到阻塞队列超过最大容量的时候,自动阻塞等待;

这里使用了几个关键的方法peek,put,peekLast,take 这里大概看看相关源码。 这里源码我们不做过多解读,大概可以看到阻塞队列的内部实现也是依赖于可重入锁ReentrantLock,然后根据put和take的操作,动态的管理锁的获取和释放。

方法/方式处理抛出异常返回特殊值一直阻塞超时退出
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()不可用不可用

所以,在这里,要选用put()take()这两个会阻塞的方法。

  	/** Maximum number of items in the deque */
    private final int capacity;

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();


 	public E peekFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }
    
   public E peekLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (last == null) ? null : last.item;
        } finally {
            lock.unlock();
        }
    }

  //take元素的时候执行,可见如果当队列内容为空的时候阻塞
  public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
  /**
    * Removes and returns first element, or null if empty.
    * 具体删除操作,当操作删除完成以后执行notFull.signal();释放notFull的阻塞
    */
   private E unlinkFirst() {
       // assert lock.isHeldByCurrentThread();
       Node<E> f = first;
       if (f == null)
           return null;
       Node<E> n = f.next;
       E item = f.item;
       f.item = null;
       f.next = f; // help GC
       first = n;
       if (n == null)
           last = null;
       else
           n.prev = null;
       --count;
       notFull.signal();
       return item;
   }
  	/**
     * Links node as last element, or returns false if full.
     * 当执行成功添加时,释放notEmpty的信号
     */
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }
public class TestBlockQueueConsumerAndProducer {

	/*最大生成数量*/
	static int MAX_NUM = 10;

	private static LinkedBlockingDeque mBlockQueue = new LinkedBlockingDeque<Integer>(MAX_NUM);


	public static void main(String args[]) {
		//创建一个生产者
		new Thread(new Producer()).start();

		//创建两个消费者
		new Thread(new Consumer()).start();
		//创建两个消费者
		new Thread(new Consumer()).start();
	}

	static class Producer implements Runnable {
		public void product() {
			while (true) {
				if (mBlockQueue.peek() == null) {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					for (int i = 1; i <= 10; i++) {
						try {
							mBlockQueue.put(i);
							System.out.println("Producer now product num:" + i);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}
		}

		@Override
		public void run() {
			product();
		}
	}


	static class Consumer implements Runnable {
		public void consume() {
			while (true) {
				try {
					Thread.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				Integer mLast = (Integer) mBlockQueue.peekLast();
				if (mLast != null && mLast == MAX_NUM) {
					try {
						int num = (Integer) mBlockQueue.take();
						System.out.println(new StringBuilder(Thread.currentThread().getName())
								.append(" Consumer now consumption num:").append(num));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}

			}
		}

		@Override
		public void run() {
			consume();
		}
	}
}

public class ProducerConsumer3 {

    class Producer extends Thread {
        private String threadName;
        private BlockingQueue<Goods> queue;

        public Producer(String threadName, BlockingQueue<Goods> queue) {
            this.threadName = threadName;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true){
                Goods goods = new Goods();
                try {
                    //模拟生产过程中的耗时操作
                    Thread.sleep(new Random().nextInt(100));
                    queue.put(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private BlockingQueue<Goods> queue;

        public Consumer(String threadName, BlockingQueue<Goods> queue) {
            this.threadName = threadName;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true){
                try {
                    Goods goods = queue.take();
                    System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    //模拟消费过程中的耗时操作
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void test() {

        int maxSize = 5;
        BlockingQueue<Goods> queue = new LinkedBlockingQueue<>(maxSize);

        Thread producer1 = new ProducerConsumer3.Producer("生产者1", queue);
        Thread producer2 = new ProducerConsumer3.Producer("生产者2", queue);
        Thread producer3 = new ProducerConsumer3.Producer("生产者3", queue);

        Thread consumer1 = new ProducerConsumer3.Consumer("消费者1", queue);
        Thread consumer2 = new ProducerConsumer3.Consumer("消费者2", queue);


        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();

        while (true) {
        }
    }
}

如果使用LinkedBlockingQueue作为队列实现,则可以实现:在同一时刻,既可以放入又可以取出,因为LinkedBlockingQueue内部使用了两个重入锁,分别控制取出和放入。

如果使用ArrayBlockingQueue作为队列实现,则在同一时刻只能放入或取出,因为ArrayBlockingQueue内部只使用了一个重入锁来控制并发修改操作。

使用信号量Semaphore实现

前提是熟悉信号量Semaphore的使用方式,尤其是release()方法,Semaphorerelease之前不必一定要先acquire

public class ProducerConsumer4 {


    class Producer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private Semaphore queueSizeSemaphore;
        private Semaphore concurrentWriteSemaphore;
        private Semaphore notEmptySemaphore;

        public Producer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
            this.threadName = threadName;
            this.queue = queue;
            this.concurrentWriteSemaphore = concurrentWriteSemaphore;
            this.queueSizeSemaphore = queueSizeSemaphore;
            this.notEmptySemaphore = notEmptySemaphore;
        }

        @Override
        public void run() {
            while (true) {
                //模拟生产过程中的耗时操作
                Goods goods = new Goods();
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    queueSizeSemaphore.acquire();//获取队列未满的信号量
                    concurrentWriteSemaphore.acquire();//获取读写的信号量
                    queue.add(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    concurrentWriteSemaphore.release();
                    notEmptySemaphore.release();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private Semaphore queueSizeSemaphore;
        private Semaphore concurrentWriteSemaphore;
        private Semaphore notEmptySemaphore;

        public Consumer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
            this.threadName = threadName;
            this.queue = queue;
            this.concurrentWriteSemaphore = concurrentWriteSemaphore;
            this.queueSizeSemaphore = queueSizeSemaphore;
            this.notEmptySemaphore = notEmptySemaphore;
        }

        @Override
        public void run() {
            while (true) {
                Goods goods;
                try {
                    notEmptySemaphore.acquire();
                    concurrentWriteSemaphore.acquire();
                    goods = queue.remove();
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    concurrentWriteSemaphore.release();
                    queueSizeSemaphore.release();
                }

                //模拟消费过程中的耗时操作
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();
        Semaphore concurrentWriteSemaphore = new Semaphore(1);
        Semaphore notEmptySemaphore = new Semaphore(0);
        Semaphore queueSizeSemaphore = new Semaphore(maxSize);


        Thread producer1 = new ProducerConsumer4.Producer("生产者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread producer2 = new ProducerConsumer4.Producer("生产者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread producer3 = new ProducerConsumer4.Producer("生产者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);

        Thread consumer1 = new ProducerConsumer4.Consumer("消费者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread consumer2 = new ProducerConsumer4.Consumer("消费者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread consumer3 = new ProducerConsumer4.Consumer("消费者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);


        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        while (true) {
        }
    }
}

1 理解代码中的三个信号量的含义
queueSizeSemaphore:(其中的许可证数量,可以理解为队列中可以再放入多少个元素),该信号量的许可证初始数量为仓库大小,即maxSize;生产者每放置一个商品,则该信号量-1,即执行acquire(),表示队列中已经添加了一个元素,要减少一个许可证;消费者每取出一个商品,该信号量+1,即执行release(),表示队列中已经少了一个元素,再给你一个许可证。

notEmptySemaphore:(其中的许可证数量,可以理解为队列中可以取出多少个元素),该信号量的许可证初始数量为0;生产者每放置一个商品,则该信号量+1,即执行release(),表示队列中添加了一个元素;消费者每取出一个商品,该信号量-1,即执行acquire(),表示队列中已经少了一个元素,要减少一个许可证;

concurrentWriteSemaphore,相当于一个写锁,在放入或取出商品的时候,都需要先获取再释放许可证。

2 由于实现中,使用了concurrentWriteSemaphore实现了对队列并发写的控制,在同一时刻,只能对队列进行一种操作:放入或取出。假如把concurrentWriteSemaphore中的信号量初始化为2或者2以上的值,就会出现多个生产者同时放入或多个消费者同时消费的情况,而使用的LinkedList是不允许并发进行这种修改的,否则会出现溢出或取空的情况。所以,concurrentWriteSemaphore只能设置为1,也就导致性能与使用wait() / notify()方式类似,性能不高。

无锁的缓存框架: Disruptor

BlockingQueue 实现生产者和消费者模式简单易懂,但是BlockingQueue并不是一个高性能的实现:它完全使用锁和阻塞来实现线程之间的同步。在高并发的场合,它的性能并不是特别的优越。(ConconcurrentLinkedQueue是一个高性能的队列,但并不没有实现BlockingQueue接口,即不支持阻塞操作)。

Disruptor是LMAX公司开发的高效的无锁缓存队列。它使用无锁的方式实现了一个环形队列,非常适合于实现生产者和消费者模式,如:事件和消息的发布。

1. 弃用锁机制转而使用CAS。

Disruptor论文中讲述了我们所做的一个实验。这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。当单线程无锁时,程序耗时300ms。如果增加一个锁(仍是单线程、没有竞争、仅仅增加锁),程序需要耗时10000ms,慢了两个数量级。更令人吃惊的是,如果增加一个线程(简单从逻辑上想,应该比单线程加锁快一倍),耗时224000ms。

所以锁的开销实际上已经被证实是非常的大了,如果减少锁的使用,降低锁的粒度,这是disruptor提供给我们的另外一种思路。

2. 为了解决伪共享而引入缓存行填充。

伪共享讲的是多个CPU时的123级缓存的问题,通常,缓存是以缓存行的方式读取数据,如图,当两个CPU同样都把X,Y load到自己的一级缓存时,实际上CPU为了争用X,Y还是会在写入时发生竞争,这样同样会导致锁,效率下降这些问题。

如何解决这个问题呢?实际上使用的是与内存对齐一样的方法,就是每次把数据对齐到跟缓存行(通常是64B)一样大小。其实内存对齐和解决伪共享的缓存行对齐其实是同一种思路。所以在disruptor源码中可以看到这样一个内部类,它为什么写成这样,其实就是为了缓存行对齐。

private static class Padding

{
/** Set to -1 as sequence starting point */

public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;

}

3. 使用一种独特的数据结构RingBuffer来代替Queue。这个数据结构的使用使得弃用锁转而用CAS成为了可能。

大家可以想象,如果要自己编写一段生产者消费者的程序,你会怎么做呢?

当然了,大多数人都会用一个队列来实现,说白了,就是把队列作为生产者和消费者之间的缓冲,从而间接的同步了他们的速度,使得异步编程成为可能。

说到异步编程,可能大家都再熟悉不过了,实际上java的图形界面就是一个典型的异步编程的例子,作为处理界面事件的消费者其实只有一个线程,而更新界面的生产者会是多个线程,由于采用了生产者消费者模型,生产者生产的界面事件会保存在一个队列里,由这个内置的线程统一去做更新,这样就保证了我们编程时不会人为破坏界面绘制的过程,从而提高代码质量,降低复杂度。顺带一提,.net的delegate也是同样的道理,如果用其他线程更新界面会出现很多怪异的问题,所以delegate通过一个事件队列,同样实现了消费者在一个单独的线程中。

扯远了。总之我们会使用一个队列来处理这种问题,这里存在几个问题,第一,如果生产者生产过快,一定时间后,队列会变得过度膨胀,占用内存空间;第二,看过队列的实现会发现,为了保证多个线程访问的正确性,在操作队列时是一定要加锁的,前面也说了,加锁以后时间会慢几个量级。

disruptor框架的设计带我们走出了这个思维定势。我们一定要用不断增长的队列吗?我们访问队列一定要加锁吗?答案都是否定的。

所谓的ringbuffer,就是一个环形队列。我们来看看为了解决这两个问题他是如何做的。

1. 一个环形队列,意味着首尾相连,也就是他的大小是有限制的,但是ringbuffer是基于这样一个假设:即生产者和消费者都在同步往前走,不存在某边特别快,这样这个对列就可以循环使用,不必有创建对象的开销,另外,ringbuffer由于是固定大小的,使用数组来保存,由于预先分配好了空间,所以定位和查找的速度自然不必说。所以它自然的解决了第一个问题。但是如果实际使用中确实是生产者快于消费者(或者反过来)呢?也就是说,在某个时间点,他们一定会首尾相遇,这时候会发生什么呢?这个问题我们随后解释。

2.为了让队列安全的在多线程环境运行,需要整个队列上锁,带来的开销是巨大的。来看ringbuffer是如何无锁的(简化起见,讨论一个生产者一个消费者的情况)。为了解释这个原理,这里借用别人的图来阐述这个问题。比如目前有一个consumer,停留在位置12,这时producer假设在位置3,这时producer的下一步是如何处理的呢?producer会尝试读取4,发现没有到下一个consumer,所以可以安全获取,于是将之改为14,并且将产品publish到14,并且通知一个阻塞的consumer起来活动。如此一直到11都是安全的(这里我们假设生产者比较快),当producer尝试访问12时发现不能继续,于是交出控制权;而consumer开始移动时,会调用barrier的waitFor方法,waitFor看到前面最近的安全节点已经到了20(21是producer),于是直接返回20,所以现在consumer可以无锁的去消费13到20的所有产品,可以想象,这种方式比起synchronized要快上很多倍。

生产者

package cn.lonecloud.procum.disruptor;
 
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;
 
import java.nio.ByteBuffer;
 
/**
 * @author lonecloud
 * @version v1.0
 * @date 下午3:02 2018/5/7
 */
public class Producer {
 
    //队列
    private final RingBuffer<Data> dataRingBuffer;
 
    public Producer(RingBuffer<Data> dataRingBuffer) {
        this.dataRingBuffer = dataRingBuffer;
    }
 
    /**
     * 插入数据
     * @param s
     */
    public void pushData(String s) {
 
        //获取下一个位置
        long next = dataRingBuffer.next();
        try {
            //获取容器
            Data data = dataRingBuffer.get(next);
            //设置数据
            data.setData(s);
        } finally {
            //插入
            dataRingBuffer.publish(next);
        }
    }
}

消费者

package cn.lonecloud.procum.disruptor;
 
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.WorkHandler;
 
/**
 * @author lonecloud
 * @version v1.0
 * @date 下午3:01 2018/5/7
 */
public class Customer implements WorkHandler<Data> {
    @Override
    public void onEvent(Data data) throws Exception {
        System.out.println(Thread.currentThread().getName()+"---"+data.getData());
    }
}

数据工厂

package cn.lonecloud.procum.disruptor;
 
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.EventFactory;
 
/**
 * @author lonecloud
 * @version v1.0
 * @date 下午3:02 2018/5/7
 */
public class DataFactory implements EventFactory<Data> {
 
    @Override
    public Data newInstance() {
        return new Data();
    }
}

主函数

package cn.lonecloud.procum.disruptor;
 
import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
 
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * @author lonecloud
 * @version v1.0
 * @date 下午3:09 2018/5/7
 */
public class Main {
    public static void main(String[] args) throws InterruptedException {
        //创建线程池
        ExecutorService service = Executors.newCachedThreadPool();
        //创建数据工厂
        DataFactory dataFactory = new DataFactory();
        //设置缓冲区大小,必须为2的指数,否则会有异常
        int buffersize = 1024;
        Disruptor<Data> dataDisruptor = new Disruptor<Data>(dataFactory, buffersize,
                service);
        //创建消费者线程
        dataDisruptor.handleEventsWithWorkerPool(
                new Customer(),
                new Customer(),
                new Customer(),
                new Customer(),
                new Customer(),
                new Customer(),
                new Customer()
        );
        //启动
        dataDisruptor.start();
        //获取其队列
        RingBuffer<Data> ringBuffer = dataDisruptor.getRingBuffer();
        for (int i = 0; i < 100; i++) {
            //创建生产者
            Producer producer = new Producer(ringBuffer);
            //设置内容
            producer.pushData(UUID.randomUUID().toString());
            //Thread.sleep(1000);
        }
    }
}

其中策略有几种:

1. BlockingWaitStrategy:阻塞策略,最节省CPU,但是高并发条件下性能最糟糕

2 SleepingWaitStrategy:在循环中无限等待,处理数据会产生高延迟,对生产线程影响小,场景:异步日志

3. YieldingWaitStrategy:低延迟场合,使用必须保证剩余的消费者线程的逻辑CPU

4. BusySpinWaitStrategy:消费者线程会尽最大努力疯狂的监控缓冲区变化。

举报

相关推荐

0 条评论