0
点赞
收藏
分享

微信扫一扫

RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage

静悠 2022-03-30 阅读 48



文章目录

  • ​​顺序消息的概念​​
  • ​​顺序消费的原理​​
  • ​​消费状态​​
  • ​​演示​​
  • ​​Producer​​
  • ​​Consumer​​
  • ​​代码​​


RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage_ide

顺序消息的概念

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。

RocketMQ可以严格的保证消息有序,可以分为​分区有序​或者​全局有序​。

顺序消费的原理

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);

而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

消费状态

package org.apache.rocketmq.client.consumer.listener;

public enum ConsumeOrderlyStatus {
/**
* Success consumption
*/
SUCCESS,

/**
* Suspend current queue a moment
* 不能跳过消息,等待一下
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

演示

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

Producer

package com.artisan.rocketmq.order;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* @author 小工匠
* @version v1.0
* @create 2019-11-10 16:38
* @motto show me the code ,change the word
* @description
**/

public class OrderedProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");

producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}

producer.shutdown();
}


/**
* 生成模拟订单数据
*/
private static List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("购物车");
orderList.add(orderDemo);

return orderList;
}

/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
}

日志

RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage_ordermessage_02

Consumer

package com.artisan.rocketmq.order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* @author 小工匠
* @version v1.0
* @create 2019-11-10 16:38
* @motto show me the code ,change the word
* @description
**/

public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
/**
* 设置消费位置
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");
// 有序消费 MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
Random random = new Random();
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序
try {
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId()
+ ", content:" + new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;

}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

运行日志

RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage_ordermessage_03

代码

请移步:​​https://github.com/yangshangwei/rocketmqMaster​​


举报

相关推荐

0 条评论