使用RocketMQ有两种方式,一种是引入rocketmq-client
需要自己创建生产者和消费者,相对来说比较繁琐;另一种是引入rocketmq-spring-boot-starter
(对rocketmq-client进行了封装),发消息和消费消息都比较简洁。
一:rocketmq-client 方式集成
1. pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2. applicaton.yml
server:
port: 8888
rocketmq:
namesrvaddr: localhost:9876
producerGroup: testProducerGroup
consumerGroup: testConsumerGroup
3. 发送消息
@RestController
public class RocketMQController {
@Value("${rocketmq.namesrvaddr}")
private String namesrvAddr;
@Value("${rocketmq.producerGroup}")
private String producerGroup;
@Value("${rocketmq.consumerGroup}")
private String consumerGroup;
@RequestMapping("/send")
public String send() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.start();
Message message = new Message("test-topic", "test-tag", "test message".getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
producer.shutdown();
return "success";
}
}
消息发送成功可以在控制台查看消息。
4. 消费消息
@RestController
public class RocketMQController {
@Value("${rocketmq.namesrvaddr}")
private String namesrvAddr;
@Value("${rocketmq.producerGroup}")
private String producerGroup;
@Value("${rocketmq.consumerGroup}")
private String consumerGroup;
@PostConstruct
public void consumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe("test-topic", "test-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
System.err.println("消费消息: " + new String(messageExt.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功
}
});
consumer.start();
}
}
@PostConstruct 表示对象初始化之后就会立即调用该方法,相当于Servlet中的init方法,表示项目一启动就启动消费者。
二:rocketmq-spring-boot-starter 方式集成
使用Spring提供的Template方式要比直接使用client方式要简单很多,不需要再创建生产者和消费者,也不用再显式设置NameServer地址了(只需在配置文件中配置一下),也不用再显式的关闭生产者了。
2.1 pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2.2 application.yml
rocketmq:
name-server: localhost:9876
producer:
group: testProducerGroup
2.3 发送消息
convertAndSend(String destination, Object payload) 发送字符串比较方便。
@RestController
public class RocketMQController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/send")
public void send() throws Exception {
rocketMQTemplate.convertAndSend("test-topic", "test message");
}
}
使用rocketMQTemplate也可以发送org.apache.rocketmq.common.message.Message
消息,需要先获取到生产者,然后通过生产者再发送。
import org.apache.rocketmq.common.message.Message;
String id = "1";
Message message = new Message("test-topic", "test-tag", id, "msg boday".getBytes());
rocketMQTemplate.getProducer().send(message);
2.4 消费消息
消费消息使用@RocketMQMessageListener注解,只需要配置消费者组名和主题即可,需要实现RocketMQListener接口。
@RocketMQMessageListener
- consumerGroup:消费者组名(必须)
- topic:主题名(必须)
- consumeMode: 消费模式(是否顺序消费)
- ConsumeMode.CONCURRENTLY(默认方式,非顺序消费)
- ConsumeMode.ORDERLY(顺序消费)
- messageModel:消息模式
- MessageModel.CLUSTERING(默认方式,集群模式:同一个消息只能被一个消费者消费) ,
- MessageModel.BROADCASTING(广播模式:所有订阅的消费者都能够获取到消息)
@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic")
public class TestTopicConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费消息:" + message);
}
}
RocketMQListener<T>
, T如果是String类型就是body, 如果是MessageExt表示的是整个MessageExt对象,MessageExt不但包含body还包含其它元数据。
@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic")
public class TestTopicConsumerListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println(messageExt);
System.out.println("body:" + new String(messageExt.getBody()));
}
}