RocketMQ分布式消息队列
一.RocketMQ的认识
1.为什么用MQ(重要)
当我们秒杀商品的时候会出现大量的客户端访问商品页面,进行结算和抢购订单等业务,由于并发很高,并且费用结算等业务本来就耗时,况且支付服务也不一定能承担那么大的请求量,就会出现卡顿。
当服务器线程耗尽,后续请求会等待变慢,再加上高并发请求就会导致后续请求越来越慢,请求长时间等待,导致大量请求超时。并发太高,可能会导致服务器的内存上升,CPU使用率急速上升,甚至导致服务器宕掉。
解决方案:使用MQ消峰,效果如下
加入MQ后的效果
- 高并发请求在MQ中排队,达到了消除峰值的目的,不会有大量的请求同时怼到支付系统
- 服务异步调用,“商品秒杀API” 把结算消息放入MQ就可以返回“订单成功和订单发货信息”给用户,响应时间很快
- 服务彻底解耦,即使支付服务挂掉,也不影响“商品秒杀API”正常工作,当支付系统再启动仍然可以继续消费MQ中的消息。
2.MQ是什么
MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。
MQ的使用场景(重要)
-
限流削峰
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统 被压垮。
-
异步&解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。 而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。 即使消费者挂掉也不影响生产者工作,只要把消息放入队列即可,消费者重启后自己消费即可。
-
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或 批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此 类数据收集是最好的选择。
-
大数据处理
比如我们的平台向“三方平台”获取数据,一次请求了大量数据回来要进行处理,由于数据较多处理不过来,那么就可以放入MQ,再创建一些消费者进行数据处理即可。
【注意
】如下情况不太适合MQ
- 小项目,体量不大,并发量低的使用MQ会太过笨重 - 你可以考虑使用Redis做一个消息队列。
- 对数据的一致性有要求(强一致性)的的场景不适合使用MQ,因为MQ是异步的。
使用MQ的好处(了解)
-
提高系统响应速度
任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
-
提高系统稳定性
一是并发被消峰后,系统不容易被高并发打垮,二是系统挂了也没关系,操作内容放到消息队列不丢失,后续重新消费者一样能消费做业务处理。
-
排序保证 FIFO
遵循队列先进先出的特点,能够保证消息按照添加的数据被消费。
常见MQ产品(了解)
-
ActiveMQ
ActiveMQ是使用Java语言开发一款MQ产品。早期很多公司与项目中都在使用。但现在的社区活跃度已 经很低。现在的项目中已经很少使用了。
-
RabbitMQ
RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是 Java语言开发,所以公司内部对其实现定制化开发难度较大。
-
Kafka
Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐率,常用于大数据领域的实 时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。对于Spring Cloud Netflix,其仅支持RabbitMQ与Kafka。
-
RocketMQ
RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其 没有遵循任何常见的MQ协议,而是使用自研协议。对于Spring Cloud Alibaba,其支持RabbitMQ、 Kafka,但提倡使用RocketMQ
下面是MQ的对比图:
技术选型建议:
- 业务场景简单,允许数据丢失,想要快速上线,推荐使用Redis
- 大数据场景,日志收集,实时性要求高,推荐Kafka
- 金融领域,不能接受消息丢失或重复,推荐使用RabbitMQ或者RocketMQ
MQ常见协议(了解)
- AMQP协议
AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。基于此协议的客户端与消息中间件可传递 消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制
,RabbitMQ就是遵循AMQP标准协议开发的MQ服务。 官方:http://www.amqp.org - JMS协议
JMS是Java消息服务,是java提供的一套消息服务API标准
,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消 息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。 - STOMP
STOMP,Streaming Text Orientated Message Protocol(面向流文本的消息协议),是一种MOM设计
的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。ActiveMQ是该协议的典型实现,RabbitMQ通过插件可以支持该协议
。 - MQTT
MQTT,Message Queuing Telemetry Transport(消息队列遥测传输),是IBM开发的一个即时通讯协 议,是一种二进制协议,主要用于服务器和低功耗IoT(物联网
)设备间的通信。该协议支持所有平 台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。 RabbitMQ通 过插件可以支持该协议。
二.RocketMQ介绍
1.RocketMQ是什么
RocketMQ是一个统一消息引擎、轻量级数据处理平台。
RocketMQ是⼀款阿⾥巴巴开源的消息中间件
,双十一承载了万亿级消息的流转,2016年11⽉,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬,2017 年 9 ⽉ ,Apache 宣布 RocketMQ孵化成为 Apache 顶级项⽬(TLP )成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。
2.RocketMQ特征
-
支持集群模型、负载均衡、水平扩展能力
-
亿级别消息堆积能力
-
采用零拷贝的原理,顺序写盘,随机读
-
底层通信框架采用Netty NIO
-
NameServer代替Zookeeper,实现服务寻址和服务协调
-
消息失败重试机制、消息可查询
-
强调集群无单点,可扩展,任意一点高可用,水平可扩展
-
经过多次双十一的考验
三.RocketMQ的安装
1.下载RocketMQ
下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.2.0/
下载解压后
2.配置ROCKETMQ_HOME
在环境变量中配置RocketMQ的地址
3.启动MQ
1.启动NameServer
Cmd命令框执行进入至MQ文件夹\bin
下,然后执行 start mqnamesrv.cmd
,启动NameServer。成功后会弹出提示框,此框勿关闭。
2. 启动Broker
进入至MQ文件夹\bin
下,修改Bean目录下的 runbroker.cmd
中JVM占用内存大小
CMD执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
,启动Broker。成功后会弹出提示框,此框勿关闭。
4.RocketMQ存储结构[了解]
RocketMQ安装好之后会在用户目录下产生一个store目录
用来存储相关数据:
- Commitlog : 消息是存储,在commitlog目录中,以mapperdFile文件顺序存储消息。
- Config : 存放运行期间的配置文件
- Consumerqueue : 该目录中存放的是队列,consume queue存放着commitlog中的消息的索引位置
- Index :存放着消息索引文件 indexFile,用来实现根据key进行消息的快速查询
- Abort : 该文件在broker启动后自动创建,正常关闭abort会消失
- Checkpoint :记录 Commitlog ,Consumerqueue 和index 文件的最后刷盘时间戳
[问
]RocketMQ数据存储在磁盘会影响性能吗?
不会,RocketMQ的性能在所有的MQ中是比较高的,主要是因为RocketMQ使用了mmap零拷贝技术,consumequeue中的数据是顺序存放的,还引入了PageCache的预读取机制,使得对 consumequeue文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。
四.RocketMQ插件
为了方便管理,我们需要安装一个可视化插件
1.下载插件
RocketMQ可视化管理插件下载地址:https://github.com/apache/rocketmq-externals/releases
2.修改配置
解压后,修改配置:src/main/resource/application.properties ,这里需要指向Name Server 的地址和端口 如下:
3.打包插件
回到安装目录(pom.xml所在目录),执行: mvn clean package -Dmaven.test.skip=true
,然后会在target目录生成打包后的jar文件
4.启动插件
进入 target 目录,CMD执行 java -jar rocketmq-console-ng-1.0.0.jar
, 访问 http://localhost:8080
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UfQ9CDCV-1647228628695)(课件图片/1634807497458.png)]4.启动插件
进入 target 目录,CMD执行 java -jar rocketmq-console-ng-1.0.0.jar
, 访问 http://localhost:8080
五.RocketMQ的原理
1.RokcetMQ架构
RocketMQ开发官方文档:
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
RocketMQ的集群架构如下
RocketMQ架构上主要分为四部分,如上图所示
1.1.Producer
消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
1.2.Consumer
消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
1.3.Broker
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
1.4.NameServer
NameServer是一个Broker与Topic路由的注册中心支持Broker的动态注册与发现,主要包括两个功能
-
Broker管理
NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。
-
路由信息管理
每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费
2.RocketMQ入门
官方案例:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
2.1.导入依赖
注意和安装的MQ版本一致
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2.2.生产者
步骤分析
- 创建producer组
- 设置NameServer地址 : 如果实在安装不上,可以使用这个地址:115.159.88.63:9876
- startr生产者
- 发送消息获取结果
- 结束producer
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer-hello");
//2.设置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3.启动producer实例
producer.start();
//4.创建消息
Message message = new Message("log-topic", "info-tag", "这是一个info信息".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送消息
SendResult result = producer.send(message);
//6.关闭producer实例
System.out.println("发送完毕,结果: "+result);
}
}
代码解释:
-
DefaultMQProducer : MQ生产者 , 可以指定组名 producerGroupName
-
producer.setNamesrvAddr : 指定Name Server地址,用作Brocker发现。注意IP和启动name server服务时指定的IP保持一致。
-
producer.start() : 启动 生产者
-
new Message(“topic_log”,“tags_error”,(“我是消息”+i).getBytes()) :消息,参数为:topic,tags,内容
-
producer.send(message) : 发送消息
-
SendResult :发送结果,其中包含
- sendStatus=SEND_OK :发送状态
- msgId :producer 创建的消息ID
- offsetMsgId :Brocker创建的消息ID
- messageQueue :消息存储的队列
- producer.shutdown():关闭生产者
2.3.消费者
-
创建consumer组
-
设置Name Server地址
-
设置消费位置,从最开始销毁
-
设置消息回调处理监听 -> 处理消息
-
Start consumer
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-hello");
//2.设置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//3.订阅topic,指定tag标签
consumer.subscribe("log-topic","info-tag");
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently(){
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s 接收到新的消息: %n", Thread.currentThread().getName());
msgs.stream().forEach(messageExt -> {
String body = null;
try {
body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(body);
});
//失败消费,稍后尝试消费,会进行多次重试
//return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者
consumer.start();
System.out.println("消费者启动...");
}
}
- DefaultMQPushConsumer :消费者 , 可以指定 consumerGroupName
- consumer.setNamesrvAddr : 设置name server 地址
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET) :从什么位置开始消费
- consumer.subscribe(“topic_log”, “tags_error”) :订阅某个topic下的某个tags的消息
- consumer.registerMessageListener :注册消息监听器,拿到消息后,进行消息处理。
- ConsumeConcurrentlyStatus :消费者消费结果状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS代表成功,ConsumeConcurrentlyStatus.RECONSUME_LATER代表消费失败,稍后重试,会进行多次重试