角色
1、 broker
- Broker面向producer和consumer接受和发送消息
- 向nameserver提交自己的信息
- 是消息中间件的消息存储、转发服务器。
- 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
broker集群
- Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
- Master多机负载,可以部署多个broker
- 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。
2、producer
- 消息的生产者
- 通过集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
- 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
3、consumer
-
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。
-
注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
4、nameserver
-
底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个 无状态(无主从) 节点
-
nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时想nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
-
nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,
-
NameServer集群间互不通信,没有主备的概念
-
nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
-
为什么不用zookeeper?:rocketmq希望为了提高性能,CAP定理,客户端负载均衡
-
- nameserver并不保证所有的nameserver的数据一致;牺牲数据一致性,保证高可用;
5、Topic
对比JSM中的Topic和Queue:
Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。
消息发送方式
1、同步发送
- 单条消息发送:
Message msg = new Message("topic001", "body 第1条".getBytes());
//同步消息发送 阻塞发送
SendResult sendResult = producer.send(msg1);
- 批量消息发送:
Message msg1 = new Message("topic001", "body 第1条".getBytes());
Message msg2 = new Message("topic001", "body 第2条".getBytes());
Message msg3 = new Message("topic001", "body 第3条".getBytes());
//支持批量消息发送
ArrayList<Message> list = new ArrayList<>();
list.add(msg1);
list.add(msg2);
list.add(msg3);
//同步消息发送 阻塞发送
SendResult sendResult = producer.send(list);
- 单向消息
Message message = new Message("topic001", "body 单向第1条".getBytes());
//单向消息
//有可能丢消息
producer.sendOneway(message);
producer.shutdown();
System.out.println("Producer shutdown...");
2、异步发送
//异步可靠消息
//不会阻塞,等待broker的确认
//采用事件监听方式接受broker返回的确认
Message message = new Message("topic001", "body 异步第2条".getBytes());
//设置异步发送失败重投时间
producer.setRetryTimesWhenSendAsyncFailed(1000);
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功。。。");
System.out.println("sendResult:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
//如果发送异常 case异常,尝试重投
//或者调整业务逻辑
throwable.printStackTrace();
System.out.println("发送异常。。。");
}
});
消息消费模式
-
集群消费:
1、 一组consumer同时消费一个topic,可以分配消费负载均衡策略分配consumer对应消费topic下的哪些queue
2、 多个group同时消费一个topic时,每个group都会消费到数据
3、一条消息只会被group中的一个consumer消费,
-
广播消费
1、消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
2、消费进度由consumer维护;
3、消费失败不会重新投递;
MessageSelector
TagFilter
- producer
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//设置nameserver地址
producer.setNamesrvAddr("192.168.100.100:9876");
producer.start();
//topic 消息将要发送的地址
//body 消息中的数据
Message msg1 = new Message("topic002", "tag-b", "key-1", "body 过滤第1条".getBytes());
//同步消息发送 阻塞发送
SendResult sendResult = producer.send(msg1);
System.out.println("sendResult:" + sendResult);
producer.shutdown();
- consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("192.168.100.100:9876");
//每个consumer关注一个topic
//过滤器 * 表示不过滤
consumer.subscribe("topic002", "tag-b");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("consumeMessage start...");
for (MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
//默认情况下 这条消息只会被一个consumer消费到 点对点
//message 状态修改
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//MessageModel.BROADCASTING 广播模式 可以多个consumer消费同一个消息 只广播一次 不会重投
//MessageModel.CLUSTERING 集群消费模式(默认) 一个消息一组consumer只消费一次 会重投
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.start();
SQLFilter
- producer
DefaultMQProducer producer = new DefaultMQProducer("producer3");
//设置nameserver地址
producer.setNamesrvAddr("192.168.100.100:9876");
producer.start();
//topic 消息将要发送的地址
//body 消息中的数据
for (int i = 0; i < 100; i++) {
Message msg1 = new Message("topic003", "tag-c", "key-1", ("body SQL过滤第1条"+i).getBytes());
msg1.putUserProperty("age", i+"");
producer.send(msg1);
}
producer.shutdown();
- consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer3");
consumer.setNamesrvAddr("192.168.100.100:9876");
//每个consumer关注一个topic
//过滤器 * 表示不过滤
MessageSelector selector = MessageSelector.bySql("age>= 18 and age <=28");
consumer.subscribe("topic003", selector);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("consumeMessage start...");
for (MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
//默认情况下 这条消息只会被一个consumer消费到 点对点
//message 状态修改
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//MessageModel.BROADCASTING 广播模式 可以多个consumer消费同一个消息 只广播一次 不会重投
//MessageModel.CLUSTERING 集群消费模式(默认) 一个消息一组consumer只消费一次 会重投
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.start();
事务消息
- 事务消息执行流程图
- producer
TransactionMQProducer producer = new TransactionMQProducer("producerTrans");
//设置nameserver地址
producer.setNamesrvAddr("192.168.100.100:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
//执行本地事务
System.out.println("executeLocalTransaction:");
System.out.println("msg:" + new String(msg.getBody()));
System.out.println("transactionId:" + msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//broker端回调,检查事务
System.out.println("checkLocalTransaction:");
System.out.println("msg:" + new String(msg.getBody()));
System.out.println("transactionId:" + msg.getTransactionId());
//事务执行成功
// return LocalTransactionState.COMMIT_MESSAGE;
//继续等待
return LocalTransactionState.UNKNOW;
//回滚消息
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
//异步可靠消息
//不会阻塞,等待broker的确认
//采用事件监听方式接受broker返回的确认
Message message = new Message("topicTrans", "body 事务消息第2条".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.println("result:"+result);
- 执行结果
executeLocalTransaction:
msg:body 事务消息第2条
transactionId:7F000001595418B4AAC232D058D90000
result:SendResult [sendStatus=SEND_OK, msgId=7F000001595418B4AAC232D058D90000, offsetMsgId=null, messageQueue=MessageQueue [topic=topicTrans, brokerName=node1, queueId=3], queueOffset=81]
checkLocalTransaction:
msg:body 事务消息第2条
transactionId:7F000001595418B4AAC232D058D90000
Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时:如果超过回查次数,默认回滚消息
TransactionListener的两个方法
-
executeLocalTransaction
半消息发送成功触发此方法来执行本地事务
-
checkLocalTransaction
broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
默认:每一分钟回调一次,一共回调15次;15次为返回执行成功状态,则默认失败;
本地事务执行状态
-
LocalTransactionState.COMMIT_MESSAGE
执行事务成功,确认提交
-
LocalTransactionState.ROLLBACK_MESSAGE
回滚消息,broker端会删除半消息
-
LocalTransactionState.UNKNOW
暂时为未知状态,等待broker回查
顺序消费
-
同一个topic
-
同一个queue (默认一个topic有4个queue)
-
同一个线程发送
-
同一个线程消费
-
producer
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//设置nameserver地址
producer.setNamesrvAddr("192.168.100.100:9876");
producer.start();
//topic 消息将要发送的地址
//body 消息中的数据
for (int i = 0; i < 98; i++) {
Message msg1 = new Message("topicSelectQueue", ("body 第"+ i +"条").getBytes());
SendResult sendResult = producer.send(msg1, new MessageQueueSelector() {
@Override
//queue选择器,手动选择一个queue
public MessageQueue select(
//当前topic里面包含的所有queue
List<MessageQueue> list,
//具体要发送的消息
Message message,
//对应send()里面的arg参数
Object o) {
//获取一个固定的queue,向固定的queue里面发送消息
MessageQueue queue = list.get((Integer)(o));
return queue;
}
},0,2000);
producer.shutdown();
- consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerSelectQueue");
consumer.setNamesrvAddr("192.168.100.100:9876");
//每个consumer关注一个topic
//过滤器 * 表示不过滤
consumer.subscribe("topicSelectQueue", "*");
//对大消费线程数
// consumer.setConsumeThreadMax(10);
//对小消费线程数
// consumer.setConsumeThreadMin(1);
//MessageListenerOrderly 顺序消费监听 对一个queue开启一个线程,多个queue开启多个线程
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : msgs){
System.out.println("thread:" + Thread.currentThread().getName()+" msg:"+new String(msg.getBody()));
System.out.println("queueId:"+msg.getQueueId());
}
//默认情况下 这条消息只会被一个consumer消费到 点对点
//message 状态修改
return ConsumeOrderlyStatus.SUCCESS;
}
});
//MessageModel.BROADCASTING 广播模式 可以多个consumer消费同一个消息 只广播一次 不会重投
//MessageModel.CLUSTERING 集群消费模式(默认) 一个消息一组consumer只消费一次 会重投
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.start();
System.out.println("Consumer1 start...");