0
点赞
收藏
分享

微信扫一扫

Spring Boot集成disruptor快速入门demo

天行五煞 04-03 19:30 阅读 1

1.disruptor介绍

什么是 Disruptor?

  • Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能的并发框架。可以认为是线程间通信的高效低延时的内存消息组件,它最大的特点是高性能。与 Kafka、RabbitMQ 用于服务间的消息队列不同,disruptor 一般用于一个 JVM 中多个线程间消息的传递。

  • 从功能上来看,Disruptor 实现了“队列”的功能,而且是一个有界队列(事实上它是一个无锁的线程间通信库)。作用与 ArrayBlockingQueue 有相似之处,但是 disruptor 从功能、性能上又都远好于 ArrayBlockingQueue。

Disruptor 的优势

  • Disruptor 最直接的应用场景自然就是“生产者-消费者”模型的应用场合了,虽然这些我们使用 JDK 的 BlockingQueue 也能做到,但 Disruptor 的性能比 BlockingQueue 提高了 5~10 倍左右:

  • 也就是说 BlockingQueue 能做的,Disruptor 都能做到且做的更好。同时 Disruptor 还能做得更多:

2.代码工程

实验目标:利用disruptor发送和接收消息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>disruptor</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>


    </dependencies>
</project>

application.yaml

server:
  port: 8088

model

package com.et.disruptor.model;


import lombok.Data;


@Data
public class MessageModel {
    private String message;
}

consumer

package com.et.disruptor.event;


import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
        try {


            Thread.sleep(1000);
            log.info("consume message start");
            if (event != null) {
                log.info("the message is:{}",event);
            }
        } catch (Exception e) {
            log.info("consume message fail");
        }
        log.info("consume message ending");
    }
}

producer

package com.et.disruptor.event;


import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;


/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName HelloEventProducer
 * @Description todo
 * @date 2024年03月29日 13:26
 */
@Component
public class HelloEventProducer {
    @Autowired
    RingBuffer<MessageModel> ringBuffer;
    public synchronized void sayHelloMq(String message){
        EventTranslator<MessageModel> et = new EventTranslator<MessageModel>() {
            @Override
            public void translateTo(MessageModel messageModel, long l) {
                messageModel.setMessage(message);
            }
        };
        ringBuffer.publishEvent(et);
    }


}

HelloEventFactory

package com.et.disruptor.event;


import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventFactory;


public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

config类

package com.et.disruptor.config;


import com.et.disruptor.event.HelloEventFactory;
import com.et.disruptor.event.HelloEventHandler;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Configuration
public class MqManager {
    @Bean("ringBuffer")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //define the thread pool for consumer message hander, Disruptor touch the consumer event to process by java.util.concurrent.ExecutorSerivce
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //define Event Factory
        HelloEventFactory factory = new HelloEventFactory();
        //ringbuffer byte size
        int bufferSize = 1024 * 256;
        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
        //set consumer event
        disruptor.handleEventsWith(new HelloEventHandler());
        //start disruptor thread
        disruptor.start();
        //gain ringbuffer ring,to product event
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
        return ringBuffer;
    }
}

controller

package com.et.disruptor.controller;


import com.et.disruptor.event.HelloEventProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


import java.util.HashMap;
import java.util.Map;


@Controller
public class HelloWorldController {
    @Autowired
    HelloEventProducer helloEventProducer;
    @RequestMapping("/hello")
    @ResponseBody
    public Map<String, Object> showHelloWorld(){
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "HelloWorld");
        return map;
    }
    @RequestMapping("/send")
    @ResponseBody
    public String  add(String message){
        helloEventProducer.sayHelloMq(message);
        return "success";
    }


}

启动类

package com.et.disruptor;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class DemoApplication {


   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springboot-demo

3.测试

  1. 启动Spring Boot应用服务

  2. 浏览器输入http://127.0.0.1:8088/send?message=hello%20world

  3. 控制台输出日志

2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message start
2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : the message is:MessageModel(message=hello world)
2024-03-29 14:00:54.831 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message ending

4.引用

  • https://github.com/LMAX-Exchange/disruptor

  • http://www.liuhaihua.cn/archives/710370.html

  • https://www.jb51.net/article/274145.htm

举报

相关推荐

0 条评论