0
点赞
收藏
分享

微信扫一扫

RocketMQ原理学习--消息类型

逸省 2022-08-26 阅读 126


一、消费模式

集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

集群消费模式:

RocketMQ原理学习--消息类型_i++

适用场景&注意事项

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证消息的每一次失败重投等逻辑都能路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播消费模式:

RocketMQ原理学习--消息类型_客户端_02

适用场景&注意事项

  • 顺序消息暂不支持广播消费模式。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,MQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,第一次启动时默认从最新消息消费,客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 广播模式下服务端不维护消费进度,所以 MQ 控制台不支持消息堆积查询和堆积报警功能。

代码示例:

设置集群消息:consumer.setMessageModel(MessageModel.CLUSTERING);

设置广播消息:consumer.setMessageModel(MessageModel.BROADCASTING);

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("localhost:9876");

//集群消费者
//consumer.setMessageModel(MessageModel.CLUSTERING);
//广播消费者
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicA-test", "TagA");

consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

二、普通消息、事务消息、顺序消息、延时消息

RocketMQ 针对不同的业务场景还提供了普通消息、事物消息、顺序消息和延时消息等几种消息类型。

1、普通消息

普通消息也叫做无序消息,简单来说就是没有顺序的消息,producer 只管发送消息,consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,可能先发送的消息先消费,也可能先发送的消息后消费。

举个简单例子,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。

因为不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。

示例:

生产者

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart 11" + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

}

消费者

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("localhost:9876");
//consumer.setInstanceName("rmq-instance2");
consumer.subscribe("TopicA-test", "TagA");

consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

2、事物消息

MQ 的事务消息交互流程如下图所示:

​​

RocketMQ原理学习--消息类型_客户端_03

​​

采用2PC提交:

第一阶段是:步骤1,2,3。
第二阶段是:步骤4,5。

RocketMQ原理学习--消息类型_i++_04

生产者:

public class TransactionProducer {

public static void main(String [] args) throws Exception{

final TransactionMQProducer producer = new TransactionMQProducer("rmq-transaction");
producer.setNamesrvAddr("localhost:9876");

//事务回查最小并发数

producer.setCheckThreadPoolMinSize(5);

//事务回查最大并发数

producer.setCheckThreadPoolMaxSize(20);

//队列数

producer.setCheckRequestHoldMax(2000);

producer.start();

//服务器回调producer,检查本地事务分支成功还是失败
producer.setTransactionCheckListener(new TransactionCheckListener() {

@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {

System.out.println("state --" + new String(messageExt.getBody()));

return LocalTransactionState.COMMIT_MESSAGE;

}

});

TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl();

for (int i = 0; i < 2; i++) {

Message msg = new Message("TopicTransaction",

"Transaction" + i,

("Hello RocketMq" + i).getBytes()

);

SendResult sendResult = producer.sendMessageInTransaction(msg, transactionExecuter, "tq");

System.out.println(sendResult);

TimeUnit.MICROSECONDS.sleep(1000);

}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

@Override
public void run() {

producer.shutdown();

}

}));

System.exit(0);

}
}

执行本地事物:

public class TransactionExecuterImpl implements LocalTransactionExecuter {

@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {

System.out.println("msg=" + new String(msg.getBody()));

System.out.println("arg = "+arg);

String tag = msg.getTags();

if (tag.equals("Transaction1")){

//这里有一个分阶段提交的概念

System.out.println("这里是处理业务逻辑,失败情况下进行ROLLBACK");

return LocalTransactionState.ROLLBACK_MESSAGE;

}

return LocalTransactionState.COMMIT_MESSAGE;

//return LocalTransactionState.UNKNOW;

}

}

消费者:

public class TransactionConsumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-transaction");
consumer.setNamesrvAddr("localhost:9876");
//consumer.setInstanceName("rmq-instance2");
consumer.subscribe("TopicTransaction", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

3、顺序消息

有序消息就是按照一定的先后顺序的消息类型。

举个例子来说,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序也就是 1、2、3 ,而不会出现普通消息那样的 2、1、3 等情况。

那么有序消息是如何保证的呢?我们都知道消息首先由 producer 到 broker,再从 broker 到 consumer,分这两步走。那么要保证消息的有序,势必这两步都是要保证有序的,即要保证消息是按有序发送到 broker,broker 也是有序将消息投递给 consumer,两个条件必须同时满足,缺一不可。
进一步还可以将有序消息分成

  • 全局有序消息
  • 局部有序消息

实现原理:由于生产者默认是轮询获取MessageQueue队列(每个Topic默认初始化4个MessageQueue),然后将消息轮询发送到不同的MessageQueue中,消息者从MessageQueue中获取数据时很可能是无序的。

局部有序消息:将相同顺序的消息发送到同一个MessageQueue队列,这样消费者从队列中获取数据肯定是相对有序的。

全局有序消息:将所有的消息发送到一个MessageQueue队列,消费者从单个队列中拉取消息,消息有序。

生产者:实现MessageQueueSelector接口,相同顺序的消息获取同一个MessageQueue

public class OrderProducer {

public static void main(String[] args) throws Exception {
try {
DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
producer.setNamesrvAddr("localhost:9876");

producer.start();

for (int i = 1; i <= 5; i++) {

Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 0);

System.out.println(sendResult);
}
for (int i = 1; i <= 5; i++) {

Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 1);

System.out.println(sendResult);
}
for (int i = 1; i <= 5; i++) {

Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 2);

System.out.println(sendResult);
}

producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

消费者:设置消息监听器为顺序消息监听器MessageListenerOrderly

public class OrderConsumer {


public static void main(String[] args) throws MQClientException {


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("localhost:9876");

/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");

/**
* 实现了MessageListenerOrderly表示一个队列只会被一个线程取到
*,第二个线程无法访问这个队列
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}

try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {

e.printStackTrace();
}
;

return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer1 Started.");
}


}

4、延时消息

延时消息,简单来说就是当 producer 将消息发送到 broker 后,会延时一定时间后才投递给 consumer 进行消费。

RcoketMQ的延时等级为:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推。

这种消息一般适用于消息生产和消费之间有时间窗口要求的场景。比如说我们网购时,下单之后是有一个支付时间,超过这个时间未支付,系统就应该自动关闭该笔订单。那么在订单创建的时候就会就需要发送一条延时消息(延时15分钟)后投递给 consumer,consumer 接收消息后再对订单的支付状态进行判断是否关闭订单。

设置延时非常简单,只需要在Message设置对应的延时级别即可

生产者:

public class DelayProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart 11" + i)
.getBytes()// body
);
//1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
// level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
msg.setDelayTimeLevel(2);

SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

}

 

 

参考博客:

​​https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.575.42512de7YsiiZ5​​

​​https://www.jianshu.com/p/11e875074a8f​​

举报

相关推荐

0 条评论