AMQP协议的核心由exchange、message queue和binding构成
“exchange”接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到“消息队列”。
“message queue”存储消息,直到这些消息被消费者安全处理完为止。
“binding”定义了exchange和message queue之间的关联,提供路由规则。
关系图如下:
适用范围
1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。
2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。
3. 有消息确认机制和持久化机制,可靠性高。
4. 开源
对比其他MQ的优势:
1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。
单机安装配置
- 安装erlang:装完后在命令行中输入erl看是否安装成功
直接安装:apt-get install erlang
源码安装:
sudo apt-get install build-essential
sudo apt-get install libncurses5-dev
sudo apt-get install libssl-dev
sudo apt-get install m4
sudo apt-get install unixodbc unixodbc-dev
sudo apt-get install freeglut3-dev libwxgtk
sudo apt-get install xsltproc
sudo apt-get install fop
sudo apt-get install tk8.5
tar xzvf otp_src_19.1.tar.tar
cd otp_src_19.1
./configure --prefix=/home/hansuo/erlang
make
make install
vim .profile
export PATH=/home/hansuo/erlang/bin:$PATH
source .profile
- cd /opt/rabbitmq_server-3.6.3/sbin
- ./rabbitmq-server start【ERROR: epmd error for host nu5i12294: address (cannot connect to host/port),在/etc/hosts文件中添加主机名】
- ./rabbitmqctl status
- ./rabbitmqctl stop【关闭服务】
- ./rabbitmq-plugins list【组件列表】
- ./rabbitmq-plugins enable/disable rabbitmq_management[plugin-name]【启/禁组件,WEB管理,websocket(stomp,端口15674)等】
- 通过 http://rabbitmq-server-ip:15672,和guest:guest的用户名密码就能登录管理页面【3.3.0版本后guest用户仅限本地登录】
- 新增用户:rabbitmqctl add_user Username Password
- 删除用户:rabbitmqctl delete_user Username
- 修改密码:rabbitmqctl change_password Username Newpassword
- 查看用户:rabbitmqctl list_users
- 设置角色:rabbitmqctl set_user_tags User Tag【User为用户名, Tag为角色名(administrator,monitoring,policymaker,management,或其他自定义名称),同一用户可设置多个角色,仅仅设置角色(即使是administrator角色)也没有访问权限(程序连接会出现connection reset错误),还需设置权限(可在管理界面Admin页中设置)】
- 查看vhost:rabbitmqctl list_vhosts
- 查看(指定hostpath)所有用户的权限:rabbitmqctl list_permissions [-p VHostPath]
- 查看指定用户的权限:rabbitmqctl list_user_permissions User
- 设置用户权限:rabbitmqctl set_permissions –p VHostPath User ConfP WriteP ReadP
- 清除用户权限:rabbitmqctl clear_permissions [-p VHostPath] User
- 使用websocket时,前端js使用方式如下:
19.1下载stomp.min.js(RabbitMQ官网有提供)
19.2 js代码
<html>
<head>
<title>RabbitMQ Web STOMP Examples : Echo Server</title>
<meta charset="UTF-8">
<script src="js/stomp.min.js"></script>
</head>
<script>
var client = Stomp.client('ws://localhost:15674/ws');
var on_connect = function(x) {
//paynotify是交换机,/exchange是固定写法
id = client.subscribe("/exchange/paynotify", function(d) {
alert(d.body);
});
};
var on_error = function() {
console.log('error');
};
//账号 密码 接到消息执行的方法 异常执行方法 路由
client.connect('guest', 'guest', on_connect, on_error, '/');
</script>
</body>
</html>
19.3服务端发消息
@Autowired
private StringRedisTemplate redisTemplate;
public void run(){
//发一个123的消息
rabbitTemplate.convertAndSend(paynotify,"","123");
}
这样浏览器就可以接受服务端的消息了
19.4 destination 在 RabbitMQ Web STOM 中进行了相关的定义,根据使用场景的不同,主要有以下 4 种:
1./exchange/<exchangeName>
对于 SUBCRIBE frame,destination 一般为/exchange/<exchangeName>/[/pattern] 的形式。该 destination 会创建一个唯一的、自动删除的、名为<exchangeName>的 queue,并根据 pattern 将该 queue 绑定到所给的 exchange,实现对该队列的消息订阅。
对于 SEND frame,destination 一般为/exchange/<exchangeName>/[/routingKey] 的形式。这种情况下消息就会被发送到定义的 exchange 中,并且指定了 routingKey。
2./queue/<queueName>
对于 SUBCRIBE frame,destination 会定义<queueName>的共享 queue,并且实现对该队列的消息订阅。
对于 SEND frame,destination 只会在第一次发送消息的时候会定义<queueName>的共享 queue。该消息会被发送到默认的 exchange 中,routingKey 即为<queueName>。
3./amq/queue/<queueName>
这种情况下无论是 SUBCRIBE frame 还是 SEND frame 都不会产生 queue。但如果该 queue 不存在,SUBCRIBE frame 会报错。
对于 SUBCRIBE frame,destination 会实现对队列<queueName>的消息订阅。
对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列<queueName>中。
4./topic/<topicName>
对于 SUBCRIBE frame,destination 创建出自动删除的、非持久的 queue 并根据 routingkey 为<topicName>绑定到 amq.topic exchange 上,同时实现对该 queue 的订阅。对于 SEND frame,消息会被发送到 amq.topic exchange 中,routingKey 为<topicName>。
集群搭建
消息处理方式
1、Clustering:不支持跨网段,各节点需运行同版本的Erlang和RabbitMQ, 应用于同网段局域网。
2、Federation:允许单台服务器上的Exchange或Queue接收发布到另一台服务器上Exchange或Queue的消息, 应用于广域网。
3、Shovel:与Federation类似,但工作在更低层次。
RabbitMQ对网络延迟很敏感,在LAN环境建议使用clustering方式;在WAN环境中,则使用Federation或Shovel。我们平时说的RabbitMQ集群,说的就是clustering方式,它是RabbitMQ内嵌的一种消息处理方式,而Federation或Shovel则是以plugin形式存在。
集群模式
▲RabbitMQ集群结构
- 普通模式(默认):
上图是由3个节点(Node1,Node2,Node3)组成的RabbitMQ普通集群环境,Exchange A的元数据信息在所有节点上是一致的;而Queue的完整信息只有在创建它的节点上,各个节点仅有相同的元数据,即队列结构。
当producer发送消息到Node1节点的Queue1中后,consumer从Node3节点拉取时,RabbitMQ会临时在Node1、Node3间进行消息传输,把Node1中的消息实体取出并经过Node3发送给consumer。
该模式存在一个问题:当Node1节点发生故障后,Node3节点无法取到Node1节点中还未被消费的消息实体。如果消息没做持久化,那么消息将永久性丢失;如果做了持久化,那么只有等Node1节点故障恢复后,消息才能被其他节点消费。
2、镜像模式(基于镜像队列):
它是在普通模式的基础上,把需要的队列做成镜像队列,存在于多个节点来实现高可用(HA)。该模式解决了上述问题,Broker会主动地将消息实体在各镜像节点间同步,在consumer取数据时无需临时拉取。
该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被大量消耗。通常地,对可靠性要求较高的场景建议采用镜像模式。
节点类型
1、RAM Node(内存节点):将所有的Virtual Host、Queue、Exchange、Binding、User等的元数据存储在内存中,因此性能比较出色。
2、DISC Node(磁盘节点): 将元数据存储在磁盘中。
注:RabbitMQ单节点环境只允许是磁盘节点,防止重启RabbitMQ时丢失系统的配置信息。RabbitMQ集群环境至少要有一个磁盘节点,因为当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。
搭建普通集群
- 修改/etc/hosts文件,配置主机名。
- 安装所有rabbitmq节点。
- 先启动一个rabbitmq节点,并将$HOME/.Erlang.cookie文件(必须保持一致)复制到其他节点相同目录下,再启动所有rabbitmq节点。
- 停止所有节点上的RabbitMQ服务,然后以detached方式独立运行:# rabbitmqctl stop; # rabbitmq-server -detached
- 将两台从节点加入到主节点:# rabbitmqctl stop_app; # rabbitmqctl join_cluster --ram rabbit@rabbit01【不加ram参数则为磁盘节点】; # rabbitmqctl start_app
- 查看各节点的集群信息:# rabbitmqctl cluster_status
- 如果需要将某个节点从集群中移除,使其变回独立节点,可以使用以下命令:# rabbitmqctl stop_app; # rabbitmqctl reset; # rabbitmqctl start_app
- 如果需要启停某个节点来进行维护,可以使用以下命令:# rabbitmqctl stop; #rabbitmq-server -detached
- 当新的节点加入到集群之后,其用户信息也被重置了(之前新增的admin账户不见了),需要重新配置管理员用户,以便访问RabbitMQ管理页面(在任意节点添加用户,会自动同步到各个集群节点)