0
点赞
收藏
分享

微信扫一扫

JAVA多线程-Disruptor单机最快的MQ

琛彤麻麻 2022-04-30 阅读 56

Disruptor

Disruptor叫无锁、高并发、环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率,用于生产者消费者模式(如果说按照设计者角度来讲他就是观察者模式)。什么叫观察者模式,想象一下,我们在前面学各种各样的队列的时候,队列就是个容器,好多生产者往里头扔东西,好多消费者从里头往外拿东西。所谓的生产者消费者就是这个意思,为什么我们可以叫他观察者呢,因为这些消费者正在观察着里面有没有新东西,如果有的话我马上拿过来消费,所以他也是一种观察者模式。Disruptor实现的就是这个容器

  • 对比ConcurrentLinkedQueue : 链表实现
  • JDK中没有ConcurrentArrayQueue
  • Disruptor是数组实现的
  • 无锁,高并发,使用环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率
  • 实现了基于事件的生产者消费者模式(观察者模式)

RingBuffer

ConcurrentLinkedQueue这里面就是一个一个链表,这个链表遍历起来肯定没有数组快,这个是一点。还有第二点就是这个链表要维护一个头指针和一个尾指针,我往头部加的时候要加锁,往尾部拿的时候也要加锁。另外链表本身效率就偏低,还要维护两个指针。关于环形的呢,环形本身就维护一个位置,这个位置称之为sequence序列,这个序列代表的是我下一个有效的元素指在什么位置上,就相当于他只有一个指针来回转。加在某个位置上怎么计算:直接用那个数除以我们整个的容量求余就可以了。

  • 环形队列
  • RingBuffer的序号,指向下一个可用的元素
  • 采用数组实现,没有首尾指针
  • 对比ConcurrentLinkedQueue,用数组实现的速度更快

那我生产者线程生产的特别多,消费者没来得及消费那我在往后覆盖的话怎么办?不会那么轻易的让你覆盖的,我们是有策略的,我生产者生产满了,要在生产一个的话就马上覆盖这个位置上的数了。这时候是不能覆盖的,指定了一个策略叫等待策略,这里面有8中等待策略,分情况自己去用。最常见的是BlockingWait,满了我就在这等着,什么时候你空了消费者来唤醒一下就继续。

Disruptor开发步骤

1.引入Maven依赖

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.6</version>
        </dependency>

2.定义Event-队列中需要处理的元素。
在Disruptor他是每一个消息都认为是一个事件,在他这个概念里就是一个事件,所以在这个环形队列里面存的是一个一个的Event。
重写了toString方法,输出对象的时候增加可读性。

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}

3.定义Event工厂,用于填充队列
指定Event的工厂生产LongEvent,实现EventFactory接口重写newInstance方法。
这里牵扯效率问题,因为Disruptor初始化的时候会调用Event工厂,对ringBuffer进行内存的提前分配,GC频率会降低。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {

    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

4.定义EventHandler(消费者),处理容器中的元素
那这个Event怎么消费呢,就需要指定Event的消费者EventHandler。
重写EventHandler接口中的onEvent方法


import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {

    /**
     *
     * @param event 要处理的消息
     * @param sequence RingBuffer的序号
     * @param endOfBatch 是否为最后一个元素 如果是true的话消费者就可以退出了,如果是false的话说明后面还有继续消费。
     * @throws Exception
     */

    public static long count = 0;

    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        count ++;
        System.out.println("[" + Thread.currentThread().getName() + "]:" + event + " :序号:" + sequence);
    }
}

public class Main01
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor  factory产生消息的工厂 defaultThreadFactory线
        //程工厂,指的是当他要产生消费者的时候,当要调用这个消费者的时候他是在一个特定的线程里执行
        //的,这个线程就是通过defaultThreadFactory来产生;
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();//获取环形数组

        //官方例程 /*通过next找到下一个可用的位置,最开始这个
        //环是空的,下一个可用的位置是0这个位置,拿到这个位置之后直接去ringBuffer里面get(0)这个位置上
        //的event。如果说你要是追求效率的极致,你应该是一次性全部初始化好,你get的时候就不用再去判
        //断,如果你想做一个延迟,很不幸的是你每次都要做判断是不是初始化了。get的时候就是拿到一个
        //event,这个是我们new出来的默认的,但是我们可以改里面的event.set( 值...),填好数据之后
        //ringBuffer.publish发布生产。*/
        /**/
        long sequence = ringBuffer.next();  // Grab the next sequence  找到下一个位置下标

        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor  获取预先初始化好的event
            // for the sequence
            event.set(8888L);  // Fill with data 填充数据 
        }先找到下标 然后把这个位置上的event拿出来再填充数据 然后发布消息
        finally
        {
            ringBuffer.publish(sequence);//生产者发布
        }

    }
}

02小程序使用translator,就是怎么样构建这个消息,原来我们都是用消息的factory,但是
下面这次我们用translator对他进行构建,就是把某一些数据翻译成消息。前面产生event工厂还是一样,然后bufferSize,后面再扔的是DaemonThreadFactory就是后台线程了,new LongEventHandler然后start拿到他的ringBuffer,前面都一样。只有一个地方叫EventTranslator不一样,我们在main01里面的代码是要写try catch然后把里面的值给设好,相当于把这个值转换成event对象。相对简单的写法,它会把某些值转成一个LongEvent,通过EventTranslator。new出来后实现了translateTo方法,EventTranslator他本身是一个接口,所以你要new的时候你又要实现它里面没有实现的方法,
translateTo的意思是你给我一个Event,我会把这个Event给你填好。
ringBuffer.publishEvent(translator1) 你只要把translator1交个ringBuffer就可以了。这个translator就是为了迎合Lambda表达式的写法(为java8的写法做准备)
EventTranslatorOneArg只有带一个参数的EventTranslator。我带有一个参数,这个参数会通过我的translateTo方法转换成一个LongEvent
既然有EventTranslatorOneArg就有EventTranslatorTwoArg、EventTranslatorThreeArg,还有EventTranslatorVararg多了去了Vararg就是有好多个值,我把里面的值全都给你加起来最后把结果set到event里面。


public class Main02
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //=============================================================== 为Java8的写法做准备
        EventTranslator<LongEvent> translator1 = new EventTranslator<LongEvent>() {//用translator对他进行构建,就是把某一些数据翻译成消息
            @Override
            public void translateTo(LongEvent event, long sequence) {
                event.set(8888L);
            }
        };

        ringBuffer.publishEvent(translator1);
        //this.sequencer.publish(sequence);

        //===============================================================
        EventTranslatorOneArg<LongEvent, Long> translator2 = new EventTranslatorOneArg<LongEvent, Long>() {
            @Override
            public void translateTo(LongEvent event, long sequence, Long l) {
                event.set(l);
            }
        };

        ringBuffer.publishEvent(translator2, 7777L);

        //===============================================================
        EventTranslatorTwoArg<LongEvent, Long, Long> translator3 = new EventTranslatorTwoArg<LongEvent, Long, Long>() {
            @Override
            public void translateTo(LongEvent event, long sequence, Long l1, Long l2) {
                event.set(l1 + l2);
            }
        };

        ringBuffer.publishEvent(translator3, 10000L, 10000L);

        //===============================================================
        EventTranslatorThreeArg<LongEvent, Long, Long, Long> translator4 = new EventTranslatorThreeArg<LongEvent, Long, Long, Long>() {
            @Override
            public void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {
                event.set(l1 + l2 + l3);
            }
        };

        ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);

        //===============================================================
        EventTranslatorVararg<LongEvent> translator5 = new EventTranslatorVararg<LongEvent>() {

            @Override
            public void translateTo(LongEvent event, long sequence, Object... objects) {
                long result = 0;
                for(Object o : objects) {
                    long l = (Long)o;
                    result += l;
                }
                event.set(result);
            }
        };

        ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);

    }
}

Lambda表达式怎么写,这个是比较简洁的写法,连factory都省了,直接指定一个Lambda表达式LongEvent::new。继续handleEventsWith把三个参数传进来后面写好Lambda表达式直接打印,然后start, 接着RingBuffer,publishEvent原来我们还有写try…catch,现在简单了直接ringBuffer.publishEvent(第一个是lambda表达式,表达式后是你指定的几个参数),所以现在的这种写法就不定义各种各样的EventTranslator了。

public class Main03
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();


        //ringBuffer.publishEvent((event, sequence) -> event.set(10000L));
        ringBuffer.publishEvent((event, sequence, l) -> event.set(l), 10000L);
        ringBuffer.publishEvent((event, sequence, l1, l2) -> event.set(l1 + l2),
                10000L, 10000L);

        System.in.read();
    }
}

ProducerType生产者线程模式

  • ProducerType有两种模式ProducerMULTI和Producer.SINGLE
  • 默认是MULTI,表示在多线程模式下产生sequence
  • 如果确认是单线程生产者,那么可以指定SINGLE,效率会提升
  • 如果是多个生产者(多线程),但模式指定为SINGLE,会出什么问题?

单线程不加锁,最后小于5000


public class Main04_ProducerType
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        // Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());

        //指定单线程模式
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE, new BlockingWaitStrategy());//ProducerType.SINGLE   ProducerType.MULTI

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //================================================================================================
        final int threadCount = 50;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            service.submit(()-> {
                System.out.printf("Thread %s ready to start!\n", threadNum );
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                for (int j = 0; j < 100; j++) {
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.set(threadNum);
                        System.out.println("生产了 " + threadNum);
                    });
                }


            });
        }

        service.shutdown();
        //disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandler.count);
    }

}

等待策略

生产者的生产速度大于消费者的消费速度,生产者要等待消费者消费消息。
生产者线程生产的特别多,消费者没来得及消费那我在往后覆盖的话怎么办?不会那么轻易的让你覆盖的,我们是有策略的,我生产者生产满了,要在生产一个的话就马上覆盖这个位置上的数了。这时候是不能覆盖的,指定了一个策略叫等待策略,这里面有8中等待策略,分情况自己去用。最常见的是BlockingWait,满了我就在这等着,什么时候你空了消费者来唤醒一下就继续。

  • (常用)BlockingWaitStrategy:通过线程堵塞的方式,等待生产者唤醒,被唤醒后,再循环检依赖的sequence是否已经消费。
  • BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
  • LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可减少lock加锁次数
  • LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛出异常
  • PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用那种等待策略
  • TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛出异常
  • (常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu
  • (常用)SleepingWaitStrategy:sleep

BlockingWaitStrategy满了就等着;SleepingWaitStrategy满了就睡一觉,睡醒了看看能不
能继续执行了;YieldingWaitStrategy让出cpu,让你消费者赶紧消费,消费完了之后我又回来看看我是不是又能生产了;一般YieldingWaitStrategy效率是最高的,但也要看实际情况适用不适用。
50个线程,每个线程生产100个消息,然后5000个被消费


public class Main05_WaitStrategy
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI, new SleepingWaitStrategy());

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //================================================================================================
        final int threadCount = 50;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            service.submit(()-> {
                System.out.printf("Thread %s ready to start!\n", threadNum );
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                for (int j = 0; j < 100; j++) {
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.set(threadNum);
                        System.out.println("生产了 " + threadNum);
                    });
                }


            });
        }

        service.shutdown();
        //disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println("LongEventHandler.count:"+LongEventHandler.count);
    }

}

多个消费者


public class Main06_MultiConsumer
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI, new SleepingWaitStrategy());

        // Connect the handlers
        LongEventHandler h1 = new LongEventHandler();
        LongEventHandler h2 = new LongEventHandler();
        disruptor.handleEventsWith(h1, h2);
/*public EventHandlerGroup<T> handleEventsWith(EventHandler... handlers) {
        return this.createEventProcessors(new Sequence[0], handlers);
    }*/
        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //================================================================================================
        final int threadCount = 10;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            service.submit(()-> {
                System.out.printf("Thread %s ready to start!\n", threadNum );
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                for (int j = 0; j < 10; j++) {
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.set(threadNum);
                        System.out.println("生产了 " + threadNum);
                    });
                }


            });
        }

        service.shutdown();
        //disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandler.count);//200 因为一个消息被一个消费者分别消费一次
    }

}

消费者异常处理

默认:disruptor.setDefaultExceptionHandler()
覆盖:disruptor.handleExceptionFor().with()
这里方法里写了一个EventHandler是我们的消费者,在消费者里打印了event之后马上抛出了异常,当我们消费者出现异常之后你不能让整个线程停下来,有一个消费者出了异常那其他的消费者就不干活了,肯定不行。handleExceptionsFor为消费者指定Exception处理器 (h1).with后面是我们的ExceptionHandler出了异常之后该怎么办进行处理,重写三个方法,第一个是当产生异常的时候在这很简单直接打印出来了;第二个是handleOnStart如果启动的时候出异常;第三个handleOnShutdown你该怎么处理。


public class Main07_ExceptionHandler
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI, new SleepingWaitStrategy());

        // Connect the handlers
        EventHandler h1 = (event, sequence, end) -> {
            System.out.println(event);
            throw new Exception("消费者出异常");
        };
        disruptor.handleEventsWith(h1);

        disruptor.handleExceptionsFor(h1).with(new ExceptionHandler<LongEvent>() {
            @Override
            public void handleEventException(Throwable throwable, long l, LongEvent longEvent) {
                throwable.printStackTrace();
            }

            @Override
            public void handleOnStartException(Throwable throwable) {
                System.out.println("Exception Start to Handle!");
            }

            @Override
            public void handleOnShutdownException(Throwable throwable) {
                System.out.println("Exception Handled!");
            }
        });

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //================================================================================================
        final int threadCount = 1;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            service.submit(()-> {
                System.out.printf("Thread %s ready to start!\n", threadNum );
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                for (int j = 0; j < 1; j++) {
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.set(threadNum);
                        System.out.println("生产了 " + threadNum);
                    });
                }


            });
        }

        service.shutdown();
        //disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandler.count);
    }
}

总结

disruptor是一个环,然后这个环有多个生产者可以往里头生产,由于它是环形的设计效率会非常的高,我们写程序的时候是这样写的,首先你自己定义好Event消息的格式,然后定义消息工厂,消息工厂是用来初始化整个环的时候相应的一些位置上各种各样不同的消息先把它new出来,new出来之后先占好空间,我们在生产的时候只需要把这个位置上这个默认的这块空间拿出来往里头填值,填好值之后消费者就可以往里头消费了,消费完了生产者就可以继续往里头生产了,如果说你生产者消费的比较快,消费着消费的比较慢,满了怎么办,就是用各种各样的等待策略,消费者出了问题之后可以用ExceptionHandler来进行处理。

举报

相关推荐

0 条评论