0
点赞
收藏
分享

微信扫一扫

RocketMQ入门教程(二):SpringBoot集成RocketMQ


使用RocketMQ有两种方式,一种是引入rocketmq-client需要自己创建生产者和消费者,相对来说比较繁琐;另一种是引入rocketmq-spring-boot-starter(对rocketmq-client进行了封装),发消息和消费消息都比较简洁。

一:rocketmq-client 方式集成

1. pom.xml

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.8.0</version>
</dependency>

2. applicaton.yml

server:
  port: 8888

rocketmq:
  namesrvaddr: localhost:9876
  producerGroup: testProducerGroup
  consumerGroup: testConsumerGroup

3. 发送消息

@RestController
public class RocketMQController {


    @Value("${rocketmq.namesrvaddr}")
    private String namesrvAddr;

    @Value("${rocketmq.producerGroup}")
    private String producerGroup;

    @Value("${rocketmq.consumerGroup}")
    private String consumerGroup;


    @RequestMapping("/send")
    public String send() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.start();

        Message message = new Message("test-topic", "test-tag", "test message".getBytes());
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        producer.shutdown();

        return "success";
    }
}

RocketMQ入门教程(二):SpringBoot集成RocketMQ_apache

消息发送成功可以在控制台查看消息。

RocketMQ入门教程(二):SpringBoot集成RocketMQ_spring_02

4. 消费消息

@RestController
public class RocketMQController {


    @Value("${rocketmq.namesrvaddr}")
    private String namesrvAddr;

    @Value("${rocketmq.producerGroup}")
    private String producerGroup;

    @Value("${rocketmq.consumerGroup}")
    private String consumerGroup;


    @PostConstruct
    public void consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe("test-topic", "test-tag");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    for (MessageExt messageExt : list) {
                        System.err.println("消费消息: " + new String(messageExt.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功
            }
        });
        consumer.start();
    }
}

@PostConstruct 表示对象初始化之后就会立即调用该方法,相当于Servlet中的init方法,表示项目一启动就启动消费者。

RocketMQ入门教程(二):SpringBoot集成RocketMQ_spring_03

二:rocketmq-spring-boot-starter 方式集成

使用Spring提供的Template方式要比直接使用client方式要简单很多,不需要再创建生产者和消费者,也不用再显式设置NameServer地址了(只需在配置文件中配置一下),也不用再显式的关闭生产者了。

2.1 pom.xml

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.1.1</version>
</dependency>

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.8.0</version>
</dependency>

2.2 application.yml

rocketmq:
  name-server: localhost:9876
  producer:
    group: testProducerGroup

2.3 发送消息

convertAndSend(String destination, Object payload) 发送字符串比较方便。

@RestController
public class RocketMQController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @RequestMapping("/send")
    public void send() throws Exception {
        rocketMQTemplate.convertAndSend("test-topic", "test message");
    }
}

使用rocketMQTemplate也可以发送org.apache.rocketmq.common.message.Message消息,需要先获取到生产者,然后通过生产者再发送。

import org.apache.rocketmq.common.message.Message;

String id = "1";
Message message = new Message("test-topic", "test-tag", id, "msg boday".getBytes());
rocketMQTemplate.getProducer().send(message);

2.4 消费消息

消费消息使用@RocketMQMessageListener注解,只需要配置消费者组名和主题即可,需要实现RocketMQListener接口。

@RocketMQMessageListener

  • consumerGroup:消费者组名(必须)
  • topic:主题名(必须)
  • consumeMode: 消费模式(是否顺序消费)
  • ConsumeMode.CONCURRENTLY(默认方式,非顺序消费)
  • ConsumeMode.ORDERLY(顺序消费)
  • messageModel:消息模式
  • MessageModel.CLUSTERING(默认方式,集群模式:同一个消息只能被一个消费者消费) ,
  • MessageModel.BROADCASTING(广播模式:所有订阅的消费者都能够获取到消息)

@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic")
public class TestTopicConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("消费消息:" + message);
    }
}

RocketMQ入门教程(二):SpringBoot集成RocketMQ_apache_04

RocketMQListener<T> , T如果是String类型就是body, 如果是MessageExt表示的是整个MessageExt对象,MessageExt不但包含body还包含其它元数据。

@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic")
public class TestTopicConsumerListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(messageExt);
        System.out.println("body:" + new String(messageExt.getBody()));
    }
}

RocketMQ入门教程(二):SpringBoot集成RocketMQ_apache_05


举报

相关推荐

0 条评论