rabbitMQ 安装和简单的使用
docker 安装 rabbitMQ
- 搜索镜像
docker search rabbitmq
- 获取镜像
docker pull rabbitmq
- 查看镜像
docker images
- 启动rabbitMQ容器
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
- 进入容器
docker exec -it rabbit /bin/bash
注意:
docker进入容器,修改配置,重启容器
cd /etc/rabbitmq/conf.d/ //rabbitmq配置文件
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
6. 安装rabbitMQ插件
rabbitmq-plugins enable rabbitmq_management
- 访问rabbitMQ web 页面,访问成功表示安装成功
账号密码为:guest
入门案例
MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统
之间进行通信。
-
应用之间的远程调用
-
加入MQ后应用之间的调用
MQ的优势
1. 应用解耦
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
-
系统耦合性越高,容错性就越低,可维护性就越低。
-
使用MQ使得应用之间解耦,提升容错性和可维护性。
2. 任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响
应时间。
- 一个下单操作耗时:20 + 300 + 300 + 300 = 920ms,用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
- 用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
3. 削峰填谷
如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易
宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。
消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个消息,这样慢慢
写入数据库,这样就不会卡死数据库了。
但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在
MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会
维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”
MQ的劣势
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高
可用?
系统复杂程度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理
失败。如何保证消息数据处理的一致性?
RabbitMQ
RabbitMQ官方地址:http://www.rabbitmq.com/
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开
发。Erlang 语言专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:
RabbitMQ 中的相关概念:
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类
似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务
时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等 - Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP
Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,
如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含
了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。
Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销 - Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发
消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and
fanout (multicast) - Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
RabbitMQ入门
- 简单模式
在上图的模型中,有以下概念: - P:生产者,也就是要发哦送消息的程序
- C:消费者,消息的接收者,会一直等待消息的到来
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中取出消息
搭建示例工程
-
创建普通maven项目,不使用maven骨架搭建
-
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
- 编写生产者
编写用到的工具类
package com.wz.rabbitmq.simple.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author wz
* @Date 2022/3/12 21:11
* @Version 1.0
*/
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("192.168.188.121");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/wz");
//连接用户名;默认为guest
connectionFactory.setUsername("wz");
//连接密码;默认为guest
connectionFactory.setPassword("guest");
//创建链接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
编写生产者com.xxx.rabbitmq.simple.Producer
package com.wz.rabbitmq.simple.simple;
import com.rabbitmq.client.*;
import com.wz.rabbitmq.simple.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author wz
* @Date 2022/3/12 20:37
* @Version 1.0
*/
public class MyConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列(创建队列)
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接,只能有一个Consumer监听这个队列
* 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除
* 参数5:队列其它参数
*/
channel.queueDeclare(MyProducer.QUEUE_NAME, true, false, false, null);
//接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签
* @param envelope 消息的主体内容
* @param properties 消息的配置属性
* @param body 消息体
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//接收到消息后的回调
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复
会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(MyProducer.QUEUE_NAME, true, consumer);
//释放资源(释放链接)不能关闭链接,当有消息就要获取消息
// channel.close();
// connection.close();
}
}
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
4. 编写消费者
package com.wz.rabbitmq.simple.simple;
import com.rabbitmq.client.*;
import com.wz.rabbitmq.simple.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author wz
* @Date 2022/3/12 20:37
* @Version 1.0
*/
public class MyConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明队列(创建队列)
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接,只能有一个Consumer监听这个队列
* 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除
* 参数5:队列其它参数
*/
channel.queueDeclare(MyProducer.QUEUE_NAME, true, false, false, null);
//接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签
* @param envelope 消息的主体内容
* @param properties 消息的配置属性
* @param body 消息体
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//接收到消息后的回调
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复
会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(MyProducer.QUEUE_NAME, true, consumer);
//释放资源(释放链接)不能关闭链接,当有消息就要获取消息
// channel.close();
// connection.close();
}
}
小结
上述的入门案例中使用如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费
者从其中取出消息。