0
点赞
收藏
分享

微信扫一扫

【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的事件总线EventBus服务功能

事件总线 — EventBus

EventBus是一种轻量级的事件发布/订阅框架,它可以帮助开发者在不同组件之间传递消息,从而实现组件之间的解耦。在本文中,我们将介绍EventBus的基本原理、使用方法以及一些最佳实践。

EventBus的基本原理

EventBus的基本原理是发布/订阅模式。在这种模式下,组件之间不直接通信,而是通过一个中介者(即EventBus)来传递消息。具体来说,组件可以向EventBus发布事件,其他组件可以订阅这些事件并在事件发生时做出响应。

EventBus的核心是事件和事件处理器。事件是一个普通的Java对象,它包含了需要传递的信息。事件处理器是一个方法,它用于处理特定类型的事件。当一个事件被发布到EventBus时,EventBus会自动将事件分发给所有订阅了该事件类型的事件处理器。

总体架构图

下面就是我采用了三种不同的框架所实现的的事件总线框架的一个架构图。

【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的事件总线EventBus服务功能_事件监听器

主要有基于spring和Guava、Disruptor所实现的事件总线框架。

定义事件监听器EventListener接口

首先,我们先定义一个专属的事件监听器EventListener接口,主要包含了两个方法。

public interface EventListener<T>{
    String topic();
    void onMessage(T message);
}

方法 — topic

主要用于进行定义对应的topic的消费主题模型,用于匹配对应的主题的消费者进行事件的消费。从而实现了对应的订阅/发布模式的运作机制。

String topic();

方法 — onMessage

主要用消费消息的回调入口方法,从这个角度来讲,是不是非常相像于mq的一个结构模型。

void onMessage(T message);

定义事件驱动传输模型

主要通过一个数据模型进行传递对应的事件传输对象,包含了一部分数据信息模型,以及对应的topic主题信息数据:EventModel,如下所示。

@NoArgsConstructor
@Data
@SuppressWarnings("all")
@ApiModel(value="事件驱动模型")
public class EventModel<T> implements Serializable {

    @ApiModelProperty(value="事件发布主题",name="事件发布主题")
    private String topic;

    @ApiModelProperty(value="事件对象模型",name="事件对象模型")
    private T entity;
}

大家可以看到这两个属性其实分别对应的就是我们的选择性的一个主题订阅以及结果请求的一个消息体。

定义事件监听器EventListenerRegistry接口

主要定义事件总线服务组件,包含了对应的注册各个类型的消费者监听器,以及发布对应的事件数据信息等功能。

public interface EventListenerRegistry<P> {
    void initRegistryEventListener(List<EventListener> eventConsumerList);
    void publish(P param);
}

initRegistryEventListener注册事件监听者

初始化注册事件监听器,主要用于注册EventListener的事件监听实现类。

void initRegistryEventListener(List<EventListener> eventConsumerList);

publish事件发布操作

作为发布事件操作,进行发布对应的数据模型,到对应的事件监听器中,最后绑定到全局的事件总线服务中。

void publish(P param);

EventListener监听器的抽象类ExecutableEventListener

BaseEventListener作为模板方法机制,进行控制封装统一的功能实现机制,将进行下游子类实现的方法进行无缝衔接。

@Slf4j
@Component
public abstract class ExecutableEventListener implements EventListener<EventModel<?>>, EventHandler<EventModel<?>> {
  
    @org.springframework.context.event.EventListener
    @Subscribe
    @Override
    public void onMessage(EventModel<?> message) {
        log.info("系统监听明细执行启动服务监听器:{}",message);
        if(topic().equals(message.getTopic())) {
            handle(message);
        }
    }

    @Override
    public void onEvent(EventModel<?> objectEventModel, long l, boolean b) throws Exception {
        onMessage(objectEventModel);
    }
    /**
     * 操作处理机制控制!
     * @param message
     */
    public abstract void handle(EventModel message);
}

此处重点介绍和分析protected abstract void handle(EventModel message);方法,此方法主要用于分析和回调对应的onEvent事件监听机制。

实现不同类型的事件监听器注册器

主要为了进行负责进行实现注册不同种类型或者事件的监听器的工具类,如下图所示三种实现类。

【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的事件总线EventBus服务功能_事件监听器_02

主要有Disruptor、guava以及Spring自身的事件监听注册器。对此我们分别进行实现对应的注册器。

Guava版本的监听器注册器 — GuavaEventListenerRegistry

首先是我们的瓜娃所实现的视线总线框架的一个组件体系,下面是一个完整的代码。

@Slf4j
@Component("guava")
public class GuavaEventListenerRegistry implements EventListenerRegistry<EventModel> {

    EventBus eventBus;

    final List<EventListener> eventListeners;

    /**
     * 构造器方式的注入方式
     * @param eventListeners
     */
    public GuavaEventListenerRegistry(List<EventListener> eventListeners) {
        this.eventListeners = eventListeners;
    }

    @PostConstruct
    public void init(){
        log.info("开始初始化Guava事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Guava事件监听器的组件服务");
    }
    /**
     * 注册监听器操作
     * @param eventConsumerList
     */
    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {
        Executor executor = ThreadUtil.newExecutor(10,20,300);
        eventBus = new AsyncEventBus(GuavaEventListenerRegistry.class.getName(),executor);
        eventConsumerList.stream().forEach(param->{
            log.info("注册服务监听器:{}",param.getClass());
            eventBus.register(ClassUtils.newInstance(param.getClass()));
        });
    }
    /**
     * 发布事件操作
     * @param param
     */
    @Override
    public void publish(EventModel param) {
        eventBus.post(param);
    }
}

接下来呢我们会针对于整体代码进行拆解,进行分析每一部分的功能的意义,

引入Guava事件监听组件

引入了Guava的事件EventBus服务的组件机制,以及Guava容器注册器内部的监听器集合。

EventBus eventBus;
final List<EventListener> eventListeners;

  • EventBus: 他是刮花框架的核心组件,也是我们整个Guava所实现事件总线的一个核心基础,主要作为我们进行发布时间以及消费事件的基础。
  • eventListeners:这个list结合作为我们Guava去注册以及消费监控的消息监听器的结合,主要存放哪些监听Listener类会被Guava所管理的容器集合。
初始化和实例化Guava容器注册器

/**
     * 构造器方式的注入方式
     * @param eventListeners
     */
    public GuavaEventListenerRegistry(List<EventListener> eventListeners) {
        this.eventListeners = eventListeners;
    }    
    @PostConstruct
    public void init(){
        log.info("开始初始化Guava事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Guava事件监听器的组件服务");
    }

因为我们采用的是Spring框架的构造器方式的注入方式,它会自动将EventListener的实现类接口进行注入到我们的集合中,进行赋值即可。

此外,因为有@PostConstruct注解进行修饰,Spring框架会在bean对象初始化的时候进行自动调用init的方法。内部会进行调用注册监听器类对象。

initRegistryEventListener

/**
     * 注册监听器操作
     * @param eventConsumerList
     */
    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {
        Executor executor = ThreadUtil.newExecutor(10,20,300);
        eventBus = new AsyncEventBus(GuavaEventListenerRegistry.class.getName(),executor);
        eventConsumerList.stream().forEach(param->{
            log.info("注册服务监听器:{}",param.getClass());
            eventBus.register(ClassUtils.newInstance(param.getClass()));
        });
    }

上面就是主要通过监听接口进行注册到EventBus核心对象模型(AsyncEventBus)中(eventBus.register),将我们注入监听器Listener类对象进行注册到我们的容器中。

引入Spring事件监听组件

主要通过spring的框架进行实现对应的事件发布的框架的总线控制机制,框架的基本结构和Guava的很相似。

@Slf4j
@Component("spring")
public class SpringEventListenerRegistry implements EventListenerRegistry<EventModel> {

    @Autowired
    ApplicationContext applicationContext;

    final List<EventListener> eventListeners;

    public SpringEventListenerRegistry(List<EventListener> eventListeners) {
        this.eventListeners = eventListeners;
    }
    @PostConstruct
    public void init(){
        log.info("开始初始化Spring事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Spring事件监听器的组件服务:{}",eventListeners);
    }
    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {

    }
    @Override
    public void publish(EventModel param) {
        applicationContext.publishEvent(param);
    }
}

主要核心属性参数

@Autowired
ApplicationContext applicationContext;
final List<EventListener> eventListeners;

ApplicationContext

主要用于去处理和发布事件的多播器门面处理模式,主要用于上下文进行处理发布事件使用。

@Override
public void publish(EventModel param) {
	applicationContext.publishEvent(param);
}

从上面的代码,我们可以看出来我们是采用了Spring的应用上下文进行直接发布时间。

List<EventListener>

主要用于处理存储所有相关的容器内部的事件驱动监听器对象集合。

public SpringEventListenerRegistry(List<EventListener> eventListeners) {
   this.eventListeners = eventListeners;
}

从上面的源代码可以看出来,属于空方法处理模式,主要是由于因为我们建立的所有的对象模型都是从属于Spring容器内部。

@Override
public void initRegistryEventListener(List<EventListener> eventConsumerList) {}

大家应该会比较好奇,为什么属于空方法呢?因为主要是由于Spring框架本身就将所有的监听器给我们注入进入容器了,所以我们自然就不需要执行该方法了。

引入Disruptor事件监听组件

比起Spring容器和Guava的容器,Disruptor容器对于构建对象的模式,无法被容器进行管理,他需要特殊的建立Factory工厂类进行构建工厂对象,建立EventModelFactory工厂类,如下所示:

public class EventModelFactory<T> implements EventFactory<EventModel<T>> {
    @Override
    public EventModel<T> newInstance() {
        return new EventModel<>();
    }
}

定义事件工厂 事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory 。 Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。 一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例, 然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

DisruptorEventListenerRegistry

此类主要是负责进行处理Disruptor对象组件机制,可以实现对应的Disruptor框架的事件发布处理模型机制,实现起来较为复杂。

@Slf4j
@Scope("prototype")
@Component("disruptor")
public class DisruptorEventListenerRegistry implements AutoCloseable, EventListenerRegistry<EventModel> {

    /**
     * disruptor事件处理器
     */
    @Setter
    @Getter
    private Disruptor<EventModel> disruptor;
   
    /**
     * 事件处理链表
     */
    @NonNull
    final List<EventListener> eventHandlers;


    public Translator TRANSLATOR = new Translator();

    /**
     * RingBuffer 大小,必须是 2 的 N 次方;
     */
    private final int DEFAULT_RING_BUFFER_SIZE = 1024 * 1024;

    /**
     * 事件工厂类
     */
    private EventFactory<EventModel> eventFactory = new EventModelFactory();

    /**
     * 线程工厂类
     */
   // private ThreadFactory threadFactory =  r -> new Thread(r,"EventModelManager"+System.currentTimeMillis());

    public DisruptorEventListenerRegistry(@NonNull List<EventListener> eventHandlers) {
        this.eventHandlers = eventHandlers;
    }

    /**
     * EventFactory构造器服务机制
     */
    @SuppressWarnings("all")
    @PostConstruct
    public void init(){
        log.info("开始初始化Disruptor事件监听器的组件服务");
        initRegistryEventListener(eventHandlers);
        log.info("完成初始化Disruptor事件监听器的组件服务");
    }

    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {
// 构造器实际线程池
        disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_BUFFER_SIZE,
                createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        EventHandler[] dataListener = eventConsumerList.stream().
                map(param->{
            EventListener<EventModel> eventModelEventListener = param;
            return eventModelEventListener;
        }).collect(Collectors.toList()).toArray(new EventHandler[eventConsumerList.size()]);
        log.info("注册服务信息接口:{}",dataListener);
        disruptor.handleEventsWith(dataListener);
        //顾名思义:执行完t1后执行t2。(对同一个任务线性执行)
//        disruptor.after(t1).handleEventsWith(t2)。
        disruptor.start();
    }

    @Override
    public void publish(EventModel param) {
        publishEvent(param);
    }

    /**
     * publish 发布事件数据对象模型
     */
    @SuppressWarnings("all")
    public void publishEvent(EventModel... eventModels){
        Objects.requireNonNull(disruptor,"当前disruptor核心控制器不可以为null!");
        Objects.requireNonNull(eventModels,"当前eventModel事件控制器不可以为null!");
        // 发布事件;
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        try {
            //获取要通过事件传递的业务数据;
            List<EventModel> dataList = Arrays.stream(eventModels).
                    collect(Collectors.toList());
            for(EventModel element:dataList){
                //请求下一个事件序号;
                long sequence = ringBuffer.next();
                //获取该序号对应的事件对象;
                EventModel event = (EventModel) ringBuffer.get(sequence);
                event.setTopic(element.getTopic());
                event.setEntity(element.getEntity());
                ringBuffer.publish(sequence);
            }
        }catch (Exception e){
            log.error("error",e);
        };
    }

    /**
     * 关闭操作处理机制
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if(Objects.nonNull(disruptor)){
            disruptor.shutdown();
        }
    }

    /**
     * 转换器模型
     */
    public class Translator implements EventTranslatorOneArg<EventModel, EventModel> {
        @Override
        public void translateTo(EventModel event, long sequence, EventModel data) {
        }
    }

    /**
     * 发送事件模型
     */
    @SuppressWarnings("all")
    public void sendEvent(EventModel... events){
       // 注意,最后的 ringBuffer.
       // publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
       // Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        //获取要通过事件传递的业务数据;
        for(EventModel event:events){
            ringBuffer.publishEvent(TRANSLATOR,event);
        }
    }


    /**
     * 线程工厂类
     * @return
     */
    public static ThreadFactory createThreadFactory(){
        AtomicInteger integer = new AtomicInteger();
        return  r ->
            new Thread(r,"disruptor-"+integer.incrementAndGet());
    }

    /**
     * 创建执行器
     * @return
     */
    public static Executor createExecutor(){
        return  ExecutorBuilder.create().setCorePoolSize(Runtime.getRuntime().availableProcessors()).
                setMaxPoolSize(100).setKeepAliveTime(60, TimeUnit.SECONDS).setWorkQueue(new ArrayBlockingQueue(200)).
                setThreadFactory(createThreadFactory()).setHandler(new
                ThreadPoolExecutor.DiscardOldestPolicy()).build();
    }
}

核心数据属性

主要核心业务属性参数

专属的disruptor事件处理器

@Setter
@Getter
private Disruptor<EventModel> disruptor;

专属的List监听器处理机制

/**
 * 事件处理链表
 */
@NonNull
final List<EventListener> eventHandlers;

Disruptor的专有属性

RingBuffer 大小,必须是 2 的 N 次方

private final int DEFAULT_RING_BUFFER_SIZE = 1024 * 1024;

事件工厂类

/**
 * 
 */
private EventFactory<EventModel> eventFactory = new EventModelFactory();

此事。件工厂主要负责去构建我们的数据驱动的模型工厂类,也就是创建对应的数据模型EventModel,具体的实现上面已经介绍了。此处不多做赘述。

initRegistryEventListener注册事件驱动监听器

此方法主要用于进行注册我们的监听器到DIsruptor的容器当中。

@Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {
// 构造器实际线程池
        disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_BUFFER_SIZE,
                createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        EventHandler[] dataListener = eventConsumerList.stream().
                map(param->{
            EventListener<EventModel> eventModelEventListener = param;
            return eventModelEventListener;
        }).collect(Collectors.toList()).toArray(new EventHandler[eventConsumerList.size()]);
        log.info("注册服务信息接口:{}",dataListener);
        disruptor.handleEventsWith(dataListener);
        //顾名思义:执行完t1后执行t2。(对同一个任务线性执行)
//        disruptor.after(t1).handleEventsWith(t2)。
        disruptor.start();
    }

  1. 构建初始化Disruptor核心门面组件

disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_BUFFER_SIZE,
                createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());

  1. 将我们统一标准化的监听器Listener转换为Disruptor可以识别的EventHandler事件处理器。
  2. ** disruptor.handleEventsWith(dataListener);**
  • 绑定上一步生成的EventHandler事件处理器。
  1. 启动我们的disruptor框架的容器机制
publishEvent发布事件

@SuppressWarnings("all")
    public void publishEvent(EventModel... eventModels){
        Objects.requireNonNull(disruptor,"当前disruptor核心控制器不可以为null!");
        Objects.requireNonNull(eventModels,"当前eventModel事件控制器不可以为null!");
        // 发布事件;
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        try {
            //获取要通过事件传递的业务数据;
            List<EventModel> dataList = Arrays.stream(eventModels).
                    collect(Collectors.toList());
            for(EventModel element:dataList){
                //请求下一个事件序号;
                long sequence = ringBuffer.next();
                //获取该序号对应的事件对象;
                EventModel event = (EventModel) ringBuffer.get(sequence);
                event.setTopic(element.getTopic());
                event.setEntity(element.getEntity());
                ringBuffer.publish(sequence);
            }
        }catch (Exception e){
            log.error("error",e);
        };
    }

此部分功能主要还就是Disruptor框架的开发模式,我们将发布的事件模型转换为EventModel之后,并且注入对应的topic和实体传输对象后,即可被下游的EventHandler进行处理即可。

总结

EventBus是一种轻量级的事件发布/订阅框架,它可以帮助开发者实现组件之间的解耦。在使用EventBus时,需要定义事件、注册事件处理器、订阅事件和发布事件。同时,应该选择合适的线程模式、避免滥用EventBus以及使用粘性事件。

举报

相关推荐

0 条评论