0
点赞
收藏
分享

微信扫一扫

#yyds干货盘点#Spring Cloud Stream 消息发送

秀妮_5519 2022-03-11 阅读 116

Spring Cloud Stream 消息发送

image-20211008132856943

source.output().send(message);来发送消息

public interface Source {

   /**
    * Name of the output channel.
    */
   String OUTPUT = "output";

   /**
    * @return output channel
    */
   @Output(Source.OUTPUT)
   MessageChannel output();

}
@FunctionalInterface
public interface MessageChannel {

   long INDEFINITE_TIMEOUT = -1;

   default boolean send(Message<?> message) {
      return send(message, INDEFINITE_TIMEOUT);
   }

   boolean send(Message<?> message, long timeout);

}

AbstractMessageChannel是消息通道的基本实现,提供发送消息和接收消息的公共方法。

消息发送到AbstractSubscribableChannel类的doSend()方法如下:

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
        implements SubscribableChannel, SubscribableChannelManagement {

    @Override
    public int getSubscriberCount() {
        return getRequiredDispatcher().getHandlerCount();
    }

    @Override
    public boolean subscribe(MessageHandler handler) {
        MessageDispatcher dispatcher = getRequiredDispatcher();
        boolean added = dispatcher.addHandler(handler);
        adjustCounterIfNecessary(dispatcher, added ? 1 : 0);
        return added;
    }

    @Override
    public boolean unsubscribe(MessageHandler handle) {
        MessageDispatcher dispatcher = getRequiredDispatcher();
        boolean removed = dispatcher.removeHandler(handle);
        this.adjustCounterIfNecessary(dispatcher, removed ? -1 : 0);
        return removed;
    }

    private void adjustCounterIfNecessary(MessageDispatcher dispatcher, int delta) {
        if (delta != 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Channel '" + this.getFullChannelName() + "' has " + dispatcher.getHandlerCount()
                        + " subscriber(s).");
            }
        }
    }

    @Override
    protected boolean doSend(Message<?> message, long timeout) {
        try {
            return getRequiredDispatcher().dispatch(message);
        }
        catch (MessageDispatchingException e) {
            String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
            throw new MessageDeliveryException(message, description, e);
        }
    }

    private MessageDispatcher getRequiredDispatcher() {
        MessageDispatcher dispatcher = getDispatcher();
        Assert.state(dispatcher != null, "'dispatcher' must not be null");
        return dispatcher;
    }

    protected abstract MessageDispatcher getDispatcher();

}

调用getDispatcher方法从DirectChannel中得到消息分发类MessageDispatcher的实现类UnicastingDispatcher

调用dispatch方法把消息分发给各个MessageHandler

UnicastingDispatcher的doDispatch方法:

private boolean doDispatch(Message<?> message) {
   if (tryOptimizedDispatch(message)) {
      return true;
   }
   boolean success = false;
   Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
   if (!handlerIterator.hasNext()) {
      throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
   }
   List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
   while (!success && handlerIterator.hasNext()) {
      MessageHandler handler = handlerIterator.next();
      try {
         handler.handleMessage(message);
         success = true; // we have a winner.
      }
      catch (Exception e) {
         RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
               () -> "Dispatcher failed to deliver Message", e);
         exceptions.add(runtimeException);
         this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
      }
   }
   return success;
}

遍历所有的MessageHandler,调用handleMessage()处理消息,MessageHandler从哪来的呢?

AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()方法添加到handlers列表。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring容器加载所有Bean并完成初始化之后完成。

RocketMQMessageChannelBinder集成消息发送

AbstractMessageChannelBinder类提供创建MessageHandler规范,createProducerMessageHandler方法在初始化Binder的时候会加载。

RocketMQMessageChannelBinder继承AbstractMessageChannelBinder,完成RocketMQMessageHandler的创建和初始化,RocketMQMessageHandler的消息处理器MessageHandler的具体实现,RocketMQMessageHandler在RocketMQBinder中的作用就是转化消息格式并发送消息。

RocketMQMessageChannelBinder的createProducerMessageHandler方法:

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
      ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
      MessageChannel channel, MessageChannel errorChannel) throws Exception {
   if (producerProperties.getExtension().getEnabled()) {

      // if producerGroup is empty, using destination
      String extendedProducerGroup = producerProperties.getExtension().getGroup();
      String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
            ? destination.getName()
            : extendedProducerGroup;

      RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
            .mergeProperties(rocketBinderConfigurationProperties,
                  rocketMQProperties);

      RocketMQTemplate rocketMQTemplate;
      if (producerProperties.getExtension().getTransactional()) {
         Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
               .getBeansOfType(RocketMQTemplate.class);
         if (rocketMQTemplates.size() == 0) {
            throw new IllegalStateException(
                  "there is no RocketMQTemplate in Spring BeanFactory");
         }
         else if (rocketMQTemplates.size() > 1) {
            throw new IllegalStateException(
                  "there is more than 1 RocketMQTemplates in Spring BeanFactory");
         }
         rocketMQTemplate = rocketMQTemplates.values().iterator().next();
      }
      else {
         rocketMQTemplate = new RocketMQTemplate();
         rocketMQTemplate.setObjectMapper(this.getApplicationContext()
               .getBeansOfType(ObjectMapper.class).values().iterator().next());
          //初始化DefaultMQProducer
         DefaultMQProducer producer;
         String ak = mergedProperties.getAccessKey();
         String sk = mergedProperties.getSecretKey();
         if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            RPCHook rpcHook = new AclClientRPCHook(
                  new SessionCredentials(ak, sk));
            producer = new DefaultMQProducer(producerGroup, rpcHook,
                  mergedProperties.isEnableMsgTrace(),
                  mergedProperties.getCustomizedTraceTopic());
            producer.setVipChannelEnabled(false);
            producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
                  destination.getName() + "|" + UtilAll.getPid()));
         }
         else {
            producer = new DefaultMQProducer(producerGroup);
            producer.setVipChannelEnabled(
                  producerProperties.getExtension().getVipChannelEnabled());
         }
         producer.setNamesrvAddr(mergedProperties.getNameServer());
         producer.setSendMsgTimeout(
               producerProperties.getExtension().getSendMessageTimeout());
         producer.setRetryTimesWhenSendFailed(
               producerProperties.getExtension().getRetryTimesWhenSendFailed());
         producer.setRetryTimesWhenSendAsyncFailed(producerProperties
               .getExtension().getRetryTimesWhenSendAsyncFailed());
         producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
               .getCompressMessageBodyThreshold());
         producer.setRetryAnotherBrokerWhenNotStoreOK(
               producerProperties.getExtension().isRetryNextServer());
         producer.setMaxMessageSize(
               producerProperties.getExtension().getMaxMessageSize());
         rocketMQTemplate.setProducer(producer);
         if (producerProperties.isPartitioned()) {
            rocketMQTemplate
                  .setMessageQueueSelector(new PartitionMessageQueueSelector());
         }
      }

      RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
            rocketMQTemplate, destination.getName(), producerGroup,
            producerProperties.getExtension().getTransactional(),
            instrumentationManager, producerProperties,
            ((AbstractMessageChannel) channel).getChannelInterceptors().stream()
                  .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
                  .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
                  .findFirst().orElse(null));
      messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
      messageHandler.setSync(producerProperties.getExtension().getSync());
      messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
      if (errorChannel != null) {
         messageHandler.setSendFailureChannel(errorChannel);
      }
      return messageHandler;
   }
   else {
      throw new RuntimeException("Binding for channel " + destination.getName()
            + " has been disabled, message can't be delivered");
   }
}

RocketMQMessageHandler中持有RocketMQTemplate对象,RocketMQTemplate是对RocketMQ客户端API的封装

DefaultMQProducer由RocketMQ客户端提供的API,发送消息到RocketMQ消息服务器都是由它来完成。

RocketMQMessageHandler是消息发送的处理逻辑,解析Message对象头中的参数,调用RocketMQTemplate中不同的发送消息接口。

RocketMQMessageHandler的handleMessageInternal方法:

protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
      throws Exception {
   try {
      // issue 737 fix
      Map<String, String> jsonHeaders = headerMapper
            .fromHeaders(message.getHeaders());
      message = org.springframework.messaging.support.MessageBuilder
            .fromMessage(message).copyHeaders(jsonHeaders).build();

      final StringBuilder topicWithTags = new StringBuilder(destination);
      String tags = Optional
            .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
            .toString();
      if (!StringUtils.isEmpty(tags)) {
         topicWithTags.append(":").append(tags);
      }

      SendResult sendRes = null;
       //发送事务消息
      if (transactional) {
         sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
               topicWithTags.toString(), message, message.getHeaders()
                     .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
         log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
      }
      else {
          //设置定时消息参数
         int delayLevel = 0;
         try {
            Object delayLevelObj = message.getHeaders()
                  .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
            if (delayLevelObj instanceof Number) {
               delayLevel = ((Number) delayLevelObj).intValue();
            }
            else if (delayLevelObj instanceof String) {
               delayLevel = Integer.parseInt((String) delayLevelObj);
            }
         }
         catch (Exception e) {
            // ignore
         }
         boolean needSelectQueue = message.getHeaders()
               .containsKey(BinderHeaders.PARTITION_HEADER);
          //同步发送
         if (sync) {
             //顺序消息
            if (needSelectQueue) {
               sendRes = rocketMQTemplate.syncSendOrderly(
                     topicWithTags.toString(), message, "",
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
             //普通消息
            else {
               sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
                     message,
                     rocketMQTemplate.getProducer().getSendMsgTimeout(),
                     delayLevel);
            }
            log.debug("sync send to topic " + topicWithTags + " " + sendRes);
         }
          //异步消息
         else {
            Message<?> finalMessage = message;
            SendCallback sendCallback = new SendCallback() {
               @Override
               public void onSuccess(SendResult sendResult) {
                  log.debug("async send to topic " + topicWithTags + " "
                        + sendResult);
               }

               @Override
               public void onException(Throwable e) {
                  log.error("RocketMQ Message hasn't been sent. Caused by "
                        + e.getMessage());
                  if (getSendFailureChannel() != null) {
                     getSendFailureChannel().send(
                           RocketMQMessageHandler.this.errorMessageStrategy
                                 .buildErrorMessage(new MessagingException(
                                       finalMessage, e), null));
                  }
               }
            };
            if (needSelectQueue) {
               rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
                     message, "", sendCallback,
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
            else {
               rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
                     sendCallback);
            }
         }
      }
      if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
         if (getSendFailureChannel() != null) {
            this.getSendFailureChannel().send(message);
         }
         else {
            throw new MessagingException(message,
                  new MQClientException("message hasn't been sent", null));
         }
      }
   }
   catch (Exception e) {
      log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
      if (getSendFailureChannel() != null) {
         getSendFailureChannel().send(this.errorMessageStrategy
               .buildErrorMessage(new MessagingException(message, e), null));
      }
      else {
         throw new MessagingException(message, e);
      }
   }

}

发送普通消息、事务消息、定时消息还是顺序消息,由Message对象的消息头Header中的属性决定,在业务代码创建Message对象时设置。

举报

相关推荐

0 条评论