RabbitMQ
什么是MQ
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
RabbitMQ的优点
由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,社区活跃度高,更新频率快(但是商业版需要收费)
Rabbit的概念
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
四大核心概念
生产者
- 产生数据发送消息的程序
消费者
- 消费者大多时候是一个等待接收消息的程序,生产者消费者很多时候都不在同一个机器,同一个机器可以即使生产者又是消费者
交换机
- 一方面接收生产者的消息,另一方面将消息推送到队列中,可以将消息推送到特定队列或者多个队列,或者将消息丢弃
队列
- RabbitMQ存储消息的一种数据结构,生产者发送的消息只能催存储在队列中,队列仅仅受主机内存以及磁盘的限制约束,本质上是一个很大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以从一个队列接收数据。
RabbitMQ的安装
-
官网地址
- https://www.rabbitmq.com/download.html
-
文件上传
- 上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
-
安装文件(分别按照以下顺序安装)
- rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- yum install socat -y
- rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
-
常用命令(按照以下顺序执行)
- 添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
- 启动服务
/sbin/service rabbitmq-server start
- 查看服务状态
/sbin/service rabbitmq-server status
- 停止服务(选择执行)
/sbin/service rabbitmq-server stop
- 开启 web 管理插件(在页面访问RabbitMQ的管理界面)r
abbitmq-plugins enable rabbitmq_management
- 添加开机启动 RabbitMQ 服务
-
访问RabbitMQ的管理界面 http://{安装RabbitMQ机器的IP地址}:15672/,用默认的账号密码guest会出现权限问题(关闭防火墙或开放端口)
- 此处添加新的用户访问
- 创建账号
rabbitmqctl add_user admin 123
- 设置用户角色
rabbitmqctl set_user_tags admin administrator
- 设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- 创建账号
- 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色rabbitmqctl list_users
- 再次使用admin登陆。
- 此处添加新的用户访问
-
重置命令
- 关闭应用的命令为
rabbitmqctl stop_app
- 清除的命令为
rabbitmqctl reset
- 重新启动命令为
rabbitmqctl start_app
- 关闭应用的命令为
RabbitMQ的入门程序
1. 导入Maven依赖
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
2. 创建消息的生产者
public class Producer {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ队列(安装RabbitMQ机器的IP地址)
factory.setHost("xxx.xxx.xxx.xxx");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/**
* 声明一个队列
* 1. 队列名称
* 2. 队列中的消息是否需要持久话(磁盘),默认情况消息存储在内存中
* 3. 该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false只能一个消费者消费
* 4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 true自动删除,false则相反
* 5. 其他参数
*/
channel.queueDeclare(QUERE_NAME,false,false,false,null);
String message = "hello world ";
/**
* 发送一个消息
* 1. 发送到那个哪个交换机
* 2. 路由的key值是哪个,本次是队列名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/
channel.basicPublish("",QUERE_NAME,null,message.getBytes());
}
}
3. 创建消息的消费者
/**
* 消费者,接受消息
*/
public class Consumer {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ队列
factory.setHost("xxx.xxx.xxx.xxx");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1. 消费哪个队列
* 2. 消费成功之后是否要自动应答 true 代表自动 false 代表手动
* 3. 推送的消息如何进行消费的接口回调
* 4. 消费者取消消费的回调
*/
channel.basicConsume(QUERE_NAME,true,deliverCallback,cancelCallback);
}
}
4. 启动消费者与生产者查看结果
- 生产者Consumer输出hello world
- RabbitMQ管理页面有生成的新队列
RabbitMQ的工作队列
轮流接收消息
- 仿照上面的程序写两个消费者消费同一个队列或者启动两个工作线程,我这里使用idea启动两个工作线程,勾选下方的标识,允许启动多个线程
/**
* 测试一个工作线程(相当于一个消费者)
*/
public class Worker01 {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
System.out.println("c1等待接收消息。。。。。");
//消息接收
channel.basicConsume(QUERE_NAME, true, deliverCallback, cancelCallback);
}
}
- 编写生产者并启动
/**
* 生产者 发送大量消息
*/
public class Task01 {
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
// 队列的声明
channel.queueDeclare(QUERE_NAME,false,false,false,null);
// 从控制台接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUERE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
- 通过生产者发送多个消息,查看两个消费者的消费情况,通过程序执行发现生产者总共发送 6个消息,消费者 1 和消费者 2 分别分得3个消息,并且是按照有序的一个接收一次消息