0
点赞
收藏
分享

微信扫一扫

消息队列和发布订阅

编程语言集成了发布订阅

很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用​​EventListener​​​实现订阅,使用​​ApplicationEventPublisher​​使用发布。这种系统集成的我们先叫它“集成组件”

与语言无关的消息队列

事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或者叫消息队列,它是以发布订阅模式为理论基础的,同时很多消息队列产品又有自己的特色,这种独立的消息队列我们为​​rabbitmq​​为例子。

共同点

  1. 代码解耦,发布者与订阅者可以互不关心
  2. 异步处理,集成组件有的是同步的,需要加​​@Async​​注解
  3. 消息安全

不同点

  1. rabbitmq实现的是多服务之间的发布与订阅
  2. 集成组件实现的是一个服务内部的发布与订阅
  3. rabbitmq是异步的,集成组件可以是异步,也可以是同步
  4. rabbitmq可以有广播,点对点等模式,而集成组件只有广播模式

基于以上的介绍,主要帮助大家理解和认识,在什么时候用什么类型的工具。

实例

  • 集成组件的发布订阅

订阅

@Getter
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class CreateBookEvent {
private String address;
private String title;
}

@Component
public class EmailEventListener {
@EventListener
@Async
public void handleEvent(CreateBookEvent event) throws Exception {
System.out.println("email消息:建立图书:" + event.getTitle());
}
}

发布

@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publish(){
applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建图书").build());
}

  • rabbitmq的发布订阅

订阅

@Slf4j
@Component
public class DistributorSubscriber {
public static final String WORK_QUEUE = "fx.activity.total";
public static final String EXCHANGE = "fx.exchange";
@Autowired
DistributorActivityTotalRepository distributorActivityTotalRepository;
@Autowired
ObjectMapper objectMapper;

@Bean
public TopicExchange phoneTotalExchange() {
return new TopicExchange(EXCHANGE);
}

@Bean
public Queue phoneTotalQueue() {
return new Queue(WORK_QUEUE);
}

@Bean
public Binding bindSignQueue() {
return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);
}
@RabbitListener(queues = WORK_QUEUE)
public void phoneTotalQueueListener(String data) {
try {
logger.debug("fx.activity.total:{}", data);
DistributorActivityTotal entity =
objectMapper.readValue(data, DistributorActivityTotal.class);
distributorActivityTotalRepository.incUpdate(entity);
} catch (Exception ex) {
logger.error("fx.activity.total.error", ex);
}
}

发布

  @Autowired
private RabbitTemplate rabbitTemplate;

public void modifySalesperson(SalesPersonDTO salesPersonDTO) {
try {
rabbitTemplate.convertAndSend(
"EXCHANGE",
"MQName",
objectMapper.writeValueAsString(salesPersonDTO)
);
logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());

} catch (Exception ex) {
logger.error("MQ.modifySalesperson.error", ex);
}
}

作者:仓储大叔,张占岭,
荣誉:微软MVP

举报

相关推荐

0 条评论