一、介绍
1.1 消息队列MQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。是系统和系统通信的一种方式。
为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
任务异步处理 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
能解决问题:1 耗时操作 2.解耦合 3.限流(削峰填谷)
1.2 实现MQ的大致有两种主流方式:AMQP、JMS
- AMQP:AMQP高级消息队列协议,是一个进程间传递异步消息的网络协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
- JMS:JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP 与 JMS 区别
-
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
-
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
-
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
JMS ①订阅模式 ②点对点消息模式
1.3. 消息队列产品
目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ,我们这里对每款MQ做一个简单介绍。
Kafka
Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统。
快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
高堆积:支持topic下消费者较长时间离线,消息堆积量大;
完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡;
支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
RocketMQ
RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ。RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 :
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
支持事务消息
亿级消息堆积能力
RabbitMQ
使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:
- 简单模式,
- work模式,
- Publish/Subscribe发布与订阅模式,
- Routing路由模式,
- Topics主题模式,
- RPC远程调用模式(远程调用,不太算MQ;不作介绍);
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
三种产品对比:
- kafka 性能好 吞吐量高 100w 丢数据 延迟性高(毫秒)
- rocketmq 性能好 吞吐量高 50w 理论上不丢数据 延迟性高(毫秒)
- rabbitmq 性能相比低 吞吐量 低 10W 丢数据 延迟性低(微秒)
生产者消费者模型:
二、RabbitMQ工作模式
2.1 Work queues工作队列模式
与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2.2 Publish/Subscribe发布与订阅模式
订阅模式示例图:
前面2个案例中,只有3个角色:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- opic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
2.3 Routing路由模式
路由模式特点:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
2.4Topics通配符模式
模式说明
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候**使用通配符**!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
图解:
- 红色Queue:绑定的是
usa.#
,因此凡是以usa.
开头的routing key
都会被匹配到 - 黄色Queue:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配
2.5 总结
推荐3-6
- 简单模式 一个生产者发送一个消息 一个消费者来消费,消费完了之后消息就没.
- 工作模式 一个生产者发送多个消息 ,多个消费者来消费不同的消息,消费完了消息就没了
- 交换机:只做消息的转发,不做消息的存储,一旦队列没有绑定到交换机,消息被生产者发送过来,就会丢失。
- 广播模式 一个生产者发送一个消息,多个消费者都可以消费同一个消息
- 路由模式 一个生产者发送一个消息,需要指定routingkey ,交换机和队列 通过routingkey进行绑定,如果指定的routingkey 和绑定的routingkey 一致 则将消息转发给对应的队列中
- 主题模式 生成者发送一个消息,需要指定routingkey ,交换机和队列 通过通配符的方式 进行绑定,如果指定的routingkey符合绑定的通配符,则将消息转发到对应的队列中。
2.6 代码部分
为了测试方便: 建立一个工程 作为生产者和消费者。
1.parent 起步依赖
2.启动类
3.发送消息---》controller 触发发送消息的动作
RabbitTemplate发送
在启动类中创建队列 创建交换机 创建绑定
配置yml
4.监听消息---》
在启动类中创建队列 创建交换机 创建绑定
配置yml
创建一个类 通过注解的方式来监听
发送消息:
//启动类
@Component
public class MessageListener {
@RabbitListener(queues = "springboot_topic_queue")
public void getmessage(String msg){
System.out.println(msg);
}
}
@Component
public class RabbitMQConfig {
//1.创建队列
@Bean
public Queue queue(){
return new Queue("springboot_topic_queue");
}
//2.创建交换机
@Bean
public TopicExchange exchangge(){
return new TopicExchange("springboot_topic_exchange");
}
//3.创建绑定
@Bean
public Binding createBinding(){
return BindingBuilder.bind(queue()).to(exchangge()).with("order.*");
}
}
//controller层
@RequestMapping("/order")
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/add")
public String addOrder() {
//1.模拟下单
System.out.println("================下单================");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("================下单成功================");
//2.发送消息(创建队列,创建交换机,绑定)
rabbitTemplate.convertAndSend("springboot_topic_exchange", "order.insert", "消息本身 insert");
rabbitTemplate.convertAndSend("springboot_topic_exchange", "order.delete", "消息本身 delete");
return "success";
}
}
监听消息:
@Component
public class MessageListener {
@RabbitListener(queues = "springboot_topic_queue")
public void getmessage(String msg){
System.out.println(msg);
}
}