0
点赞
收藏
分享

微信扫一扫

JUC实现生产者消费者队列

booksmg2014 2022-07-18 阅读 92


       

LinkedBlockingQueue

阻塞类:

take() 出队列头部,如果取不到,会一直阻塞

put(E)  入队列尾部,如果进不去,会一直阻塞

不报错类

offer(E) 入队列,成功true,满了返回false

poll()   出队列,没有元素返回 null

报错类

add(E) 满了报错

remove() 无元素报错

        在内存中,J.U.C提供的ThreadPoolExecutor可支持排队,通过BlockingQueue暂存还没有来得及执行的任务。

关于 LinkedBlockQueue
      这是一个阻塞的线程安全的队列,底层应该采用链表实现
      入队列:(尾部添加)
      add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
      put方法,若向队尾添加元素的时候发现队列已经满了会一直阻塞!
      offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false。

      出队列(头部取出并移除)
      poll: 若队列为空,返回null。
      remove:若队列为空,抛出NoSuchElementException异常。
      take:若队列为空,发生阻塞,等待有元素。

      构造函数如果不接受参数,为无界队列.如果添加速度快于移除速度,则很可能OOM
      初始化的时候可增加队列长度限制,队列长度会对上述三种添加方法产生影响.
     

protected static LinkedBlockingQueue queue = new LinkedBlockingQueue();

        用户的任务进入队列后,一般使用线程池来并发调度任务的消费。

static final ExecutorService pool = new ThreadPoolExecutor(2, 10,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(256));

关于线程池ThreadPoolExecutor

        corePoolSize:没有任务执行的时候的线程数量。相当于常驻工作组。

        maxPoolSize:如果队列已经满了,并且当前线程数量少于corePoolSize,那么就会创建新的线程。但是总线程数量不会超过maxPoolSize。

        如下代码提供了一个push方法,将用户提交的任务加入了queue队列中。该队列的构造函数没有数字,说明目前是一个无界队列。 

public class ProducerQueue ...

protected static LinkedBlockingQueue queue = new LinkedBlockingQueue();

static final ExecutorService pool = new ThreadPoolExecutor(2, 10,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(256));


public static void push(Object mission) {
if (Objects.nonNull(mission)) {
queue.add(mission);
pool.execute(() -> VideoProducerQueue.notifies());
}
}

       任务入队列后,调用线程池通知消费者开始处理用户任务。

private static void notifies() {
while (!queue.isEmpty()) {
Object target = queue.poll();
if(target !=null ){
new VideoConsumer(target).handle();
}
}
}


public class VideoConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(VideoConsumer.class);

private Object product;

public VideoConsumer(Object product) {
this.product = product;
}

//内部也可以维护一个消费者队列

public void handle() {
try {
LOGGER.info("消费对象" + product + ",入库、记录、处理....");
Thread.sleep(3000);
} catch (Exception ex) {
LOGGER.error("消费" + product + "失败", ex);
}
}
}

      

举报

相关推荐

0 条评论