目录
- 什么是MQ
- 为什么要使用MQ
- MQ的使用场景
- MQ如何选型
- 如何消息队列保证高可用(消息队列的集群模式)
- RocketMQ如何保证消息不丢失(重要)
- RocketMQ如何保证消息的顺序性
- 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
- 如果让你设计一个消息队列,你会怎么设计架构
什么是MQ
消息队列中间件,是一种跨进程的通信机制,用于上下游传递消息。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
MQ的优点
异步、解耦、削峰
异步
不是很重要的操作,可以通过异步操作来实现。(当然线程池也可以实现,但没有解耦的特点)
解耦
每加一个下游操作,都需要改上游的代码,重新部署,太麻烦了;解耦的意思是,下游直接订阅MQ,上游不需要改代码,就很不错
削峰
把请求放到队列里面,然后至于每秒消费多少请求,就看自己的服务器(一般是数据库)处理能力,等流量高峰下去了,你的服务也就没压力了(短暂的请求积压是允许的)
MQ的缺点
- 系统的可用性降低:万一MQ突然挂了,上下游就没办法交互了,导致上游不可用
- 系统复杂性提高:如何保证没有重复消费呢?如何处理消息丢失的情况?怎么保证消息传递的顺序?(下面一一补充)
- 一致性问题:上游无法知道下游的执行结果,这一点是很致命的,分布式事务问题
为什么要使用MQ
我们公司本身的业务体量很小,所以直接单机一把梭啥都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想,分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件。
MQ的使用场景
原文地址:https://blog.csdn.net/qq_35152037/article/details/80012961
场景一:数据驱动的任务依赖(任务的执行需要顺序)
例子
什么是任务依赖,举个栗子,互联网公司经常在凌晨进行一些数据统计任务,这些任务之间有一定的依赖关系,比如:
- task3需要使用task2的输出作为输入
- task2需要使用task1的输出作为输入
这样的话,tast1, task2, task3之间就有任务依赖关系,必须task1先执行,再task2执行,载task3执行。
原解决方案(cron人工排执行时间表)
定时任务之间设置预留时间,确保上一个任务执行完再进行下一个任务
1.每个任务执行的时间不确定,在任务之间需要有预留时间
2.如果任务执行快了,但预留时间太多了,下一个任务没法很快执行,浪费时间
导致的问题(时间资源的浪费)
MQ方案
好处
- 不需要预留buffer,上游任务执行完,下游任务总会在第一时间被执行
- 解耦:依赖多个任务,被多个任务依赖都很好处理,只需要订阅相关消息即可
- 有任务执行时间变化,下游任务都不需要调整执行时间(和第一点一样)
场景二:上游不关心执行结果(异步、解耦)
例子
拿下单来说,下单之后一般会有用户增加积分、发送短信等等操作,这些操作可以晚点再发,对用户来说只要不影响正常下单就行
原实现方案(调用关系,依次执行)
下单之后,调用积分服务,增加积分,调用短信服务发送短信。
导致的问题
- 主业务耗时增加:下单时间增加了,而且随着附属业务功能越来越多,时间会越来越久
- 系统可用性降低,万一下游的一个服务挂了,会导致下单失败
MQ方案
- 下单业务成功后,给MQ发送一个消息
- 积分业务、短信业务等等去MQ上主动订阅《下单成功》消息
场景三:上游关注执行结果,但执行时间很长
下单操作,需要调用微信系统(跨公网调用,请求时间会比较长),延长了下单的时间
MQ方案(回调网关+MQ)
一般采用“回调网关+MQ”方案来解耦:
- 调用方直接跨公网调用微信接口
- 微信直接返回调用成功(此时并不代表返回成功)
- 微信执行完成后,回调统一网关
- 网关将返回结果发给MQ
- 上游订阅MQ,收到结果通知
为什么需要网关还需要MQ
这里需要注意的是,不应该由回调网关来调用上游来通知结果,如果是这样的话,每次新增调用方,回调网关都需要修改代码,仍然会反向依赖,使用回调网关+MQ的方案,新增任何对微信支付的调用,都不需要修改代码啦
MQ如何选型
推荐看看:敖丙之消息队列(mq)是什么?
比较维度
吞吐量
万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。
- ActiveMQ、RabbitMQ 万级别
- RocketMQ 十万级别
- Kafka 百万级别
可用性(高可用)
都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 Kafka也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用。
- 基于主从架构:ActiveMQ、RabbitMQ
- 基于分布式架构:RocketMQ、Kafka(分布式架构和主从架构有什么区别,是集群模式?类似于redis集群?)
时效性(修改后立马见效的意思)
RabbitMQ 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他三个都是 ms 级。
- 微秒级:RabbitMQ
- 毫秒级:ActiveMQ、RocketMQ、Kafka
功能支持(了解)
除了 Kafka,其他三个功能都较为完备。 Kafka 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准。
消息丢失
ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。
- 小概率丢失:ActiveMQ、RabbitMQ
- 不会丢失:RocketMQ、Kafka
总结
ActiveMQ
ActiveMQ的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
RabbitMQ(基于 erlang 开发,JAVA人员看不懂)
RabbitMQ在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做 erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级??这还不高。。),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
RocketMQ(用JAVA开发,快乐来源)
RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用 RocketMQ 挺好的
Kafka(大数据)
Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略,这个特性天然适合大数据实时计算以及日志收集。
原文链接:JavaGuide之常见的消息队列对比
如何消息队列保证高可用(消息队列的集群模式)
以RocketMQ为例
1. 几个概念
Topic消息类型(大类)
是相当于一种消息类型
Queue队列(明细类)
是属于某个Topic下的更细分的一种单元
消费者组
消费者和消费者组属于个体与群体的关系
原文链接:rocketMQ中,消费者、消费者组、Topic、队列的关系
2. RocketMQ消息的存储结构(ConsumeQueue + CommitLog)
ConsumeQueue 逻辑队列(类似索引,指向真的文件CommitLog)
消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic下的每个 Message Queue都有一个对应的 ConsumeQueue文件
CommitLog (提交日志?这名字也太怪了)刷盘后存在磁盘上的文件
消息真正的物理存储文件,每台 Broker上的 CommitLog被本机器所有 ConsumeQueue 共享。
3. 思路
在实际生产环境中,一般需要服务达到高可用、无单节点故障的要求。在 rocketMq 中
就需要分布式部署。
RocketMQ的核心就是Broker的消息存储,而高可用的关键也在于Broker。
4. NameServer(类似于注册中心,管理Broker的)
因为NameServer是无状态的,如果要使用集群部署,只要由一台NameServer可用,那么集群就整体可用。
5. Broker(存储消息的服务)
一个很重要的消息存储模块
1. 作用
- 接收Producer发过来的消息
- 消息的持久化存储
- 处理Consumer的消费消息请求
- 消息的 HA 机制以及服务端过滤功能(之后了解TODO)
实际存储消息的服务,服务不可用可能导致消息丢失
2. 集群数量配置(推荐使用双主双从)
就是Broker的主从机子的数量,我自己猜有下面几种情况:
1.一主多从
2.双主双从(推荐)
为什么推荐使用双主双从?因为高可用
3.多主多从
3. 主从复制模式配置(主节点复制到从节点)
指的是消息在Broker主节点到从节点之间复制的过程是同步还是异步
1. 同步双写(又叫同步复制)
同步双写保证消息不会丢失,如果5s内未完成消息复制,则给生产者Producer返回结果:数据同步到Slave服务器超时 SendStatus.FLUSH_SLAVE_TIMEOUT
brokerRole = SYNC_MASTER
2. 异步双写(又叫异步复制,是默认方式)
异步复制性能高
brokerRole = ASYNC_MASTER
4. 数据刷盘模式配置(消息从内存到磁盘的过程)
指的是消息(包括主从)被写入内存的pagecache再存到磁盘的过程
1. 同步刷盘
偏向消息高可靠,如果5s内未完成刷盘,则给生产者Producer返回结果:刷盘超时 SendStatus.FLUSH_SLAVE_TIMEOUT
flushDiskType = SYNC_FLUSH
2. 异步刷盘(默认方式)
偏向性能,异步刷盘性能比较高
flushDiskType = ASYNC_FLUSH
4. 主从复制和刷盘模式的选择(两两组合四种情况)
- 异步复制,异步刷盘:如果偏向性能的话
- 同步双写,同步刷盘:如果偏向消息高可靠,不在乎性能
- 同步双写,异步刷盘:如果需求居中
- 异步双写,同步刷盘:啥玩意??忽略(都不给你加粗)
如果断电等瞬时故障导致主从同时宕机可能会丢失几条消息。正常情况异步复制一台机器发生故障不会丢失数据。
5. 双主双从模式下的故障分析情况
原文链接:RocketMq高可用部署/消息高可靠性方案以及故障模拟
题外话:问,双主之间是如何同步的??
TODO
6. NameServer和Broker的关系
- 每个Broker会和所有的NameServer保持长连接(通过心跳的方式)
1. 如何维持心跳
1. NameServer怎么做
nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。
2. Broker怎么做
Broker每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
RocketMQ如何保证消息不丢失(重要)
原文链接:面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!
消息的生命周期
- 生产阶段:Producer生产消息,并通过网络把消息传给RocketMQ的Broker
- 存储阶段:消息通过同步、异步刷盘的方式存储在Broker端磁盘(CommitLog)中
- 消费阶段:Consumer将会从Broker拉取消息
1. 生产阶段如何保证消息不丢失
生产者(Producer) 通过网络发送消息(可以分为同步和异步)给Broker,当Broker收到之后,将会返回确认响应信息给 Producer。
1. Broker返回给Producer消息的类型(来自官方)
1. SendStatus.SEND_OK:消息发送成功
消息发送成功,但并不意味这是可靠的,如果要确保不会丢失任何消息,还应该启动同步Master的服务器(SYNC_MASTER)和同步刷盘操作(SYNC_FLUSH)
2. SendStatus.FLUSH_DISK_TIMEOUT:刷盘超时
消息发送成功,但服务器刷盘超时,此时消息已经进入了服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘的时间长度,如果Broker服务器设置了同步刷盘,当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回这个状态。
3. SendStatus.FLUSH_SLAVE_TIMEOUT:数据同步到Slave服务器超时
消息发送成功,但同步到Slave时超时,此时消息已经进入了服务器队列(内存),只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,且从机Broker未在同步刷盘时间内(默认为5s)完成与主服务器的同步,则将返回这个状态。
4. SendStatus.SLAVE_NOT_AVAILABLE 无Slave服务器可用
消息发送成功,但此时Slave不可用,如果Broker服务器的角色是同步Master,但没有配置Slave Broker服务器,则将返回该状态
2. Producer异步发送消息给Broker
RocketMQ 还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。
异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。
3. 设置重试次数(同步、异步都有)
不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。
2. 存储阶段如何保证消息不丢失
1. 案例
- Master Broker挂了,异步刷盘还没完成,整体消息丢失
- Master Broker挂了,异步Slave复制还没完成,还未复制到Slave的消息丢失
2. 解决
- 设置同步刷盘(flushDiskType = SYNC_FLUSH)
- 设置同步复制(brokerRole = ASYNC_MASTER)
3. 分析
- 设置这2个同步,再加上生产者端的补偿机制(如重试),就可以完全保证消息不会丢失
- 如果非要设置为异步刷盘,好像没法解决了
4. 那么同步落盘怎么才能快
- 使用 FileChannel(文件管道??) + DirectBuffer 池(直接缓冲池??),使用堆外内存,加快内存拷贝
- 使用数据和索引分离,当消息需要写入时,使用 commitlog 文件顺序写,当需要定位某个消息时,查询index文件来定位,从而减少文件IO随机读写的性能损耗(啊?原来不就是这么操作的吗)
3. 消费阶段如何保证消息不丢失
消费者从Broker拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给Broker。
如果 Broker未收到消费确认响应或收到其他状态(问:其他状态是啥?第二个状态收到了难道就不会再重试了吗?),消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。
1. Comsumer返回给Broker的消息类型(都整理到这里)
并发消费返回的状态
1. ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消费成功
返回这个状态,告诉Broker,下游业务真的执行成功了,Broker可以把这个消息删掉了
2. ConsumeConcurrentlyStatus.RECONSUME_LATER:稍后再重试消费
返回这个状态,告诉Broker,等会会重新再消费,Comsumer下次还会继续消费这个消息:
步骤如下:
- 把这个消息先放到延迟队列中,设定的延时时间
- 延迟时间到了,再放到重试队列中
- 每隔一定时间(这个时间间隔可以自定义,不一定要一样)再去重试这条消息,默认最多重试16次
- 如果还是消费不了,就会进入死信队列
- 人工介入处理,或者启动一个定时任务去订阅这个死信队列完成后续的处理
有序消费返回的状态
1. ConsumeOrderlyStatus.SUCCESS:消费成功
2.ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT:稍后消费
返回这个状态,告诉Broker,先等一会,再继续处理这批消息(不会放到重试队列,也不会增加重试次数,就等一小会就行)
2. 存在的问题
可能会导致消息重复消费(Producer消费消息成功,但是因为网络原因,未能把成功标志返回给Broker,Broker侧判断不出Producer是否有消费消息,所以没有操作这条消息,Producer还是可以继续消费这条消息,导致重复消费),对于Producer。需要保证消费接口的幂等性(如何保证接口的幂等性,可以看我的另一篇文章 :【八股文】分布式篇)
RocketMQ如何保证消息的顺序性
为什么会出现乱序
Broker中的每个Topic都有多个Queue,写入消息的时候会平均分配(负载均衡机制,默认轮询,也可以自定义)给不同的Queue,假如我们有一个消费者组ComsumerGroup,这个消费组中的每一台机器都会负责一部分Queue,那么就会导致顺序的乱序问题
例子
Producer先后发送了2条消息,一条insert,一条update,分别分配到2台Queue中,消费者组中的两台机器分别处理这两个Queue中的消息,这时候顺序是无法保证的
如何解决
1. 保证Producer、Queue、Comsumer是一对一对一的关系
缺点
- 消息队列的吞吐量降低(绝对不容忍这样的情况发生)
- 如果Comsumer服务炸了,后面的消息就无法消费,被阻塞了
2. 把需要保持顺序消费的消息放到同一个Queue中,且让同一台机子处理
- 这一批顺序消息有共同的唯一ID,把唯一ID与队列的数量进行hash取余运算,保证这批消息进入到同一个队列
除此之外,还要考虑Comsumer消费失败的重试问题
使用有序消费,如果失败了会返回这个状态:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT:稍后消费
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
分析
这一系列问题的本质其实就是消费端出问题了!!!(消费慢或者不消费了)
线上出问题怎么解决(修复Comsumer、紧急扩容)
思路:临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
-
修复consumer的BUG,确保其恢复消费速度,然后将现有cnosumer都停掉
-
创建10倍新队列:新建一个topic,queue是原来的10倍,临时建立好原先10倍或者20倍的queue数量
-
写一个临时的分发数据的consumer程序:这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
-
Comsumer10倍扩容:临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
-
打完收工:等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
消息过期失效了怎么办?(人工介入,半夜执行代码)
RabbitMQ的消息会有失效时间,写一个程序,把失效的消息ID找回来,等机子空闲的时候再手动塞进MQ里
消息队列满了怎么办?(快速消费,假消费)
快速消费掉所有消息,如假装消费(直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS),先把消息保存下来,等空闲的时候再手动塞进MQ里
如果让你设计一个消息队列,你会怎么设计架构
问题剖析
其实问的就是MQ有哪些核心点
- 有没有对RocketMQ消息队列做过较为深入的原理的了解,或者从整体了解把握住一个mq的架构原理
- 看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来
开放性问题,不写答案啦~