0
点赞
收藏
分享

微信扫一扫

Cannot find a @StreamListener matching for message with id: 99a2a40a-fcf9-800a-384d-e3782846c0ed

白衣蓝剑冰魄 2022-01-28 阅读 92

rocketMQ报错Cannot find a @StreamListener matching for message with id: 99a2a40a-fcf9-800a-384d-e3782846c0ed

原因:

RocketMQ整合Spring cloud stream之后代码中写了

@StreamListener(value = OrderEventSink.ORDER_STATUS_CHANGED_EVENT_INPUT, condition = "headers['rocketmq_TAGS']=='ORDER'")
    public void handleOrderStatusChangedEvent(Message<OrderStatusChangedEvent> message) {}

但是配置文件没有配置

spring:
  cloud:
    stream:
        bindings:
            orderStatusChangedEventInput:
              destination: order_status_updated_event_topic
//以下代码没有配置
        rocketmq:
            bindings:
              OrderStatusChangedEventInput:
                consumer:
                  tags: ORDER

当消息发送到服务,但是被过滤了,就会包这个错

//org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler#handleRequestMessage这个类中报出来的错

protected Object handleRequestMessage(Message<?> requestMessage) {
        List<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper> matchingHandlers = this.evaluateExpressions ? this.findMatchingHandlers(requestMessage) : this.handlerMethods;
        if (matchingHandlers.size() == 0) {
            if (this.logger.isWarnEnabled()) {
// 这是报错的地方
                this.logger.warn("Cannot find a @StreamListener matching for message with id: " + requestMessage.getHeaders().getId());
            }

            return null;
        } else if (matchingHandlers.size() <= 1) {
            DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper singleMatchingHandler = (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper)matchingHandlers.get(0);
            singleMatchingHandler.getStreamListenerMessageHandler().handleMessage(requestMessage);
            return null;
        } else {
            Iterator var3 = matchingHandlers.iterator();

            while(var3.hasNext()) {
                DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper matchingMethod = (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper)var3.next();
                matchingMethod.getStreamListenerMessageHandler().handleMessage(requestMessage);
            }

            return null;
        }
    }

解决办法:

1. yml配置文件中配置上下面那个过滤的代码

spring:
  cloud:
    stream:
//以下代码配置上
        rocketmq:
            bindings:
              OrderStatusChangedEventInput:
                consumer:
                  tags: ORDER

配置上之后,在RocketMQ的broker上就过滤了,就不会发送到服务器上,消息查看就会出现

 消息被发送到服务器上,但是因为配置了head

, condition = "headers['rocketmq_TAGS']=='ORDER'"

 找不到监听的就报错

解决方法2:

yml不配置tag,在代码中配置

, condition = "headers['rocketmq_TAGS']=='ORDER'"

然后自己判断

@StreamListener(value = OrderEventSink.ORDER_STATUS_CHANGED_EVENT_INPUT, condition = "headers['rocketmq_TAGS']=='ORDER'")
    public void handleOrderStatusChangedEvent(Message<OrderStatusChangedEvent> message) {

    Object tags = message.getHeaders().get("rocketmq_TAGS");
        if(!tags.equals("ORDER")){
            return;
        }
//下面是自己的业务

}
举报
0 条评论