准备
Rocket部署
下载源码并构建
下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip
> unzip rocketmq-all-4.9.3-source-release.zip
> cd rocketmq-all-4.9.3/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3
启动NameServer
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
测试消息的发送和消费
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
停止服务
如果需要停止服务,执行如下命令
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
部署RocketMQ Dashbord
为了方便管理,我们还需要Dashbord
下载地址:https://gitee.com/nswish/rocketmq-dashboard/repository/archive/master.zip
修改配置文件application.properties,增加配置rocketmq.config.namesrvAddr地址
# 省略...
rocketmq.config.namesrvAddr=localhost:9876
# 省略...
构建并启动
# Maven spring-boot run
mvn spring-boot:run
# 或 Maven build and run
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
访问:http://localhost:8080。
至此,RocketMQ已经部署完成。
项目中导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
普通消息
消息发送
消息发送的步骤:
- 创建消息生产者 producer,并指定生产者组名
- 指定 Nameserver 地址
- 启动 producer
- 创建消息对象,指定 Topic、Tag 和消息体
- 发送消息
- 关闭生产者 producer
同步发送
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等。
代码示例:
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
异步发送
异步消息通常用在对响应时间敏感的业务场景。
示例代码:
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
//启用Broker故障延迟机制
producer.setSendLatencyFaultEnable(true);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
单向发送
这种方式主要用在不需要关心发送结果的场景,例如日志发送。
示例代码:
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer 对象。
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 20; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消息发送的权衡
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 | 适用场景 |
---|---|---|---|---|
同步发送 | 快 | 有 | 可靠 | 适用广泛,如重要的消息通知,短信通知等。 |
异步发送 | 快 | 有 | 可靠 | 对响应时间敏感的应用场景 |
单向发送 | 最快 | 有 | 不 可靠 | 可靠性要求不高的场景,如日志采集 |
消息消费
- 创建消费者 Consumer,指定消费者组名
- 指定 Nameserver 地址
- 订阅主题 Topic 和 Tag
- 设置回调函数,处理消息
- 启动消费者 consumer
集群消费
一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。实际上,每个 Consumer 是平均分摊 Message Queue 的。例如,一个 Topic 有3个 Queue,其中一个Consumer Group 有3个实例,那么每个实例只消费其中一个Queue。
这种模式下,消费进度(Consumer Offset)的存储会持久化到 Broker。
代码示例,启动同一分组下的两个消费者
public class BalanceComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
广播消费
消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。实际上,是一个消费组下的每个消费者实例都获取到了 topic 下面的每个 Message Queue 去拉取消费。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
代码示例,启动统一分组下的两个消费者
public class BroadcastComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息消费的权衡
集群模式
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。
广播模式
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 不支持顺序消息、不支持重置消费位点。
- 广播模式下, RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
顺序消息
在默认的情况下,消息发送会采取轮询方式把消息发送到不同的 queue;而消费消息的时候是从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序的。
但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
全局顺序消息:
分区顺序消息:
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。
public class ProducerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<Order> orderList = new ProducerInOrder().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < orderList.size(); i++) {
// 加个时间前缀
String body = dateStr + " Order:" + orderList.get(i);
Message msg = new Message("PartOrder", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单
*/
@Getter
@Setter
@ToString
private static class Order {
private long orderId;
private String desc;
}
/**
* 生成模拟订单数据 3个订单 每个订单4个状态
*/
private List<Order> buildOrders() {
List<Order> orderList = new ArrayList<Order>();
Order orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
运行结果:
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}
使用顺序消息时,首先要保证消息是有序进入 MQ 的,对 id 等关键字进行取模后,放入指定 messageQueue中,Consume 消费消息失败时, 不能返回 reconsume_later,这样会导致乱序,应该返回 suspend_current_queue_a_moment。
消费时,同一个 OrderId 获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
consumer.setNamesrvAddr("localhost:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("PartOrder", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
运行结果:
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}
延时消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。
消息生产和消费有时间窗口要求;比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是msg.setDelayTimeLevel(3)代表延迟 10 秒;、“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java
生产者示例代码:
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
// delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(4);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
消费者示例代码:
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topics
consumer.subscribe("ScheduledTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
批量消息
批量发送消息能显著提高传递消息的性能。限制是,这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。
生产者示例代码:
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费者示例代码:
public class BatchComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("BatchTest", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
批量切分
如果消息的总长度可能大于 4MB 时,这时候需要把消息进行分割。
生产者示例代码:
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
//10万元素的数组
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//把大的消息分裂成若干个小的消息(1M左右)
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
Thread.sleep(100);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;//1M
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > sizeLimit) {
//单个消息超过了最大的限制(1M)
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
过滤消息
Tag 过滤
在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。
生产者示例代码:
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者示例代码:
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是一个消息只能有一个标签。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。
Sql 过滤
SQL 基本语法
RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
数值比较: 比如:>,>=,<,<=,BETWEEN,=;
字符比较: 比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号:AND,OR,NOT;
常量支持类型为: 数值,比如:123,3.1415; 字符,比如:‘abc’,必须用单引号包裹起来; NULL,特殊的常量;布尔值,TRUE 或 FALSE
生产者示例代码:
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者示例代码:
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
consumer.setNamesrvAddr("localhost:9876");
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
如果出现如下报错错误:说明 Sql92 功能没有开启
需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重启 Broker 服务。
事物消息
如图,事务消息分为两个流程:
- 正常事务消息的发送和提交(1,2,3,4)
- 事务消息的补偿流程(4,6,7)
正常事务流程
- 发送半事务消息。
- 服务端响应半事务消息的发送结果。
- 根据发送结果执行本地事务。如果写入失败,此时半消息对业务不可见,本地逻辑不执行。
- 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)。
事务补偿流程
补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
- 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”。
- Producer 收到回查消息,检查回查消息对应的本地事务的状态。
- 根据本地事务状态,重新 Commit 或者 Rollback。
事务消息状态
事务消息共有三种状态:
TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown:它代表需要检查消息队列来确定状态。
使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,通过设置自定义线程池来处理事务回查请求。
执行本地事务后、需要根据执行结果对消息队列进行回复。
生产者示例代码
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("hzy_produce");
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。checkLocalTransaction 方法用于检查本地事务状态。
事务监听器示例代码
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}