0
点赞
收藏
分享

微信扫一扫

《分布式消息中间件实践》 读书笔记


写在前面

  • 听人家总说这些,之前项目里也没有用到,其实挺好奇的,之前也没时间看。这里准备用这本书入门,查了一下,书评很一般,但是入门应该够了,而且200多页,正合适,生活加油
  • 笔记主要是书的摘写,不懂的地方写一些自己的理解。
  • 嗯,看了一些发现东西挺多,是我唐突了…,不简单。
  • 笔记还在更新中。

夫人之相与,俯仰一世,或取诸怀抱,悟言一室之内;或因寄所托,放浪形骸之外--------《兰亭集序》——东晋·王羲之

第1章消息队列

1.1 系统间通信技术介绍

  • 如果是一个业务被拆分成多个子业务部署在不同的服务器上,那就是​​分布式应用​​;
  • 如果是同一个业务部署在多台服务器上,那就是​​集群​​。

分布式应用的子系统之间并不是完全独立的,它们需要相互通信来共同完成某个功能,这就涉及​​系统间通信​​了。

目前,业界通常有两种方式来实现系统间通信,

  • 基于远程过程调用的方式(RPC调用);
  • 基于消息队列的方式。

​RPC​​​是一种​​通过网络从远程计算机程序上请求服务​​​,而不需要了解底层网络技术的​​协议​​。

  • 它是协议,是一种规范,就需要有遵循这套规范的实现。​​典型的RPC实现​​​包括​​Dubbo​​​、​​Thrift​​​、 ​​GRPC​​等。
  • 网络通信的实现是透明的,调用方不需要关心网络之间的通信协议、网络IO模型、通信的信息格式等。
  • 跨语言,调用方实际上并不清楚对端服务器使用的是什么程序语言。对于调用方来说,无论其使用的是何种程序语言,调用都应该成功,并且返回值也应按照调用方程序语言能理解的形式来描述。

​基于消息队列​​​的方式是指由应用中的​​某个系统负责发送信息​​​,由关心这条消息的​​相应系统负责接收消息​​​,并在收到消息后进行各自系统内的业务处理。消息可以非常简单,​​比如只包含文本字符串;也可以很复杂,比如包含字节流、字节数组,还可能包含嵌入对象,甚至是Java·对象(经过序列化的对象)。​

《分布式消息中间件实践》 读书笔记_服务器

1.2 为何要使用消息队列

消息队列的典型场景-异步处理,解耦、流量削峰、日志收集、事务最终一致性等问题。

  • 异步处理:这个可以结合AJAX理解,比如AJAX通过异步请求,这个异步请求是可以随时发出的,那么在Tomcat里就有相应的工作队列来存储请求信息。
  • 模块解耦:随着需求的叠加,各模块之间逐渐变成了相互调用的关系,这种​​模块间紧密关联的关系就是紧耦合​​​。紧耦合带来的问题是对​​一个模块的功能变更将导致其关联模块发生变化​​​,因此各个模块难以​​独立演化​​​。要解决这个问题,可以在​​模块之间​​​调用时增加一个​​中间层来实现解耦​​​,这也方便了以后的扩展。所谓解耦,简单地讲,就是一个模块只关心自己的核心流程,而依赖该模块执行结果的其他模块如果做的不是很重要的事情,有​​通知即可,无须等待结果​​。换句话说,基于消息队列的模型,关心的是通知,而非处理。
  • 流量削峰 某一时刻网站突然迎来​​用户请求高峰期​​​的情况,如果在设计上考虑不周甚至会发生雪崩(在分布式系统中,经常会出现某个基础服务不可用造成整个系统不可用的情况,这种现象被称为“服务雪崩效应”),从而发生整个系统不可用的严重生产事故。当访问量剧增时系统依然可以继续使用,该怎么做呢?首先想到的是​​购买更多的服务器进行扩展​​​,以增强系统处理并发请求的能力。如果都以能处理此类流量峰值为标准投入大量资源随时待命无疑是很大的浪费。在业界的诸多实践中,常见的是使用​​消息队列​​​,​​先将短时间高并发的请求持久化,然后逐步处理,从而削平高峰期的并发流量,改善系统的性能​
  • 日志收集 利用消息队列产品在​​接收和持久化消息​​​方面的高性能,​​引入消息队列快速接收日志消息​​​,避免因为​​写入日志时的某些故障导致业务系统访问阻塞​​​、​​请求延迟​​​等。所以很多公司会选择构建一个​​日志收集系统​​​,由它来​​统一收集业务日志数据​​​,供​​离线和在线的分析系统使用​​。
  • 事务最终一致性 :业界曾经提出过一个处理分布式事务的规范-XAXA主要定义了全局事务管理器(Transaction Manager)和局部资源管理器(Resource Manager)之间的接口.XA接口是双向的系统接口,在事务管理器及一个或多个资源管理器之间形成通信桥梁。XA引入的事务管理器充当全局事务中的协调者的角色。​​事务管理器​​​控制着全局事务,管理事务生命周期,并协调资源。​​资源管理器​​​负责控制和管理实际资源(如数据库或JMS队列)。目前各主流数据库都提供了对XA规范的支持。XA所以它的最大缺陷是​​性能很差​​​,因此并不适合在生产环境下有​​高并发和高性能​​要求的场景。在业界的很多实践方案中,都可以借助消息队列来处理此问题。

1.3 消息队列的功能特点

一个典型意义上的消息队列,至少需要包含​​消息的发送、接收和暂存功能​

《分布式消息中间件实践》 读书笔记_客户端_02

在生产环境应用中,对​​消息队列的要求​​​远不止基本的消息发送、接收和暂存。在不同的业务场景中,需要消息队列产品能解决诸如​​消息堆积、消息持久化、可靠投递、消息重复、严格有序、集群​​等各种问题。

  1. 消息堆积:消息在处理中心逐渐积压而得不到释放。比如给​​消息队列设置一个阈值​​,将超过阈值的消息不再放入处理中心,以防止系统资源被耗尽,导致机器挂掉甚至整个消息队列不可用。
  2. 消息持久化: 将消息放在​​内存中​​​存在的最大问题是,一旦​​机器宿掉​​消息将丢失。持久化方案有很多种,比如将消息存到本地文件、分布式文件系统、数据库系统中等。
  3. 可靠投递:可靠投递是不允许存在消息丢失的情况的。从消息的整个生命周期来分析,消息丢失的情况一般发生在如下过程中:
  • 从生产者到消息处理中心。
  • 从消息处理中心到消息消费者。
  • 消息处理中心持久化消息。
  1. 消息重复:为了支持消息可靠投递,当消息发送失败或者不知道是否发送成功时(比如超时),消息的状态是待发送,定时任务不停地轮询所有的待发送消息,最终保证消息不会丢失,这就带来了消息可能会重复的问题。
  2. 严格有序 : 在实际的业务场景中,经常会碰到需要​​按生产消息时的顺序来消费的情形​​​。需要消息队列能够提供​​有序消息​​​的保证。但​​顺序消费​​​却不一定需要消息在整个产品中​​全局有序​​​,有的产品可能只需要提供​​局部有序​​的保证。
  3. 集群:系统架构一般都需要​​实现高可用性​​​,以排除​​单点故障​​​引起的服务中断,保证7x24小时不间断运行,所以可能需要​​消息队列​​​产品提供对​​集群模式​​​的支持。集群不仅可以让消费者和生产者在某个节点崩溃的情况下继续运行,集群之间的多个节点还能够​​共享负载​​​,当某台机器或网络出现故障时能​​自动进行负载均衡​​​,而且可以通过增加​​更多的节点​​​来提高​​消息通信​​​的​​吞吐量。​
  4. 消息中间件:消息中间件关注于​​数据的发送和接收,利用高效、可靠的异步消息传递机制集成分布式系统​​​。消息传输中间件(MOM)简化了​​应用之间数据的传输​​​,​​屏蔽​​​了底层的异构操作系统和网络平台,提供了一致的​​通信和应用开发标准​​​,确保在​​分布式​​​计算网络环境下可靠、跨平台的信息传输和数据交换。它基于​​消息队列​​​的​​存储-转发​​​机制,并提供了​​特有的异步传输机制​​​,能够基于​​消息传输​​​和​​异步事务处理​​​实现应用​​整合与数据交换。​

中间件:非底层操作系统软件、非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为​​中间件​​。

1.4 设计一个简单的消息中间件

1.4.1 消息处理中心

实现一个消息队列。代码详见​​https://gitee.com/liruilonger/workspack/tree/master/src/com/msg_queue/jkd​​

package com.msg_queue.jkd;

import java.util.concurrent.ArrayBlockingQueue;

/**
* @Classname Broker
* @Description TODO 消息处理中心类( Broker )
* @Date 2021/7/5 0:37
* @Created Li Ruilong
*/
public class Broker {

///队列存储消息的最大数量
private final static int MAX_SIZE = 3;

//保存消息数据的容
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);

// 生产消息
public static void produce(String msg){
if (messageQueue.offer(msg)){
System.out.printf("投递成功%s,当前暂存消息数量%d",msg,messageQueue.size());
}else {
System.out.println("消息处理中心内暂存的消息达到最大负荷,不能放入消息");
}

}

//消费消息
public static String consume(){
//
String msg = messageQueue.poll();
if (msg != null){
System.out.printf("已经消费消息 %s,当前暂存消息数量 %s",msg,messageQueue.size());
}else {
System.out.println("消息处理中心没有消息可供消费");
}
return msg;
}
}

BrokerServer用来对提供Broker类的服务。

/**
* @Classname BrokerServer
* @Description TODO 定义了BrokerServer类用来对外提供Broker类的服务。
* @Date 2021/7/5 12:32
* @Created Li Ruilong
*/
public class BrokerServer implements Runnable{

public static int SERVICE_PORT = 9999;

private final Socket socket;

//该Socket是由客户端请求的得到的Socket实例。
public BrokerServer(Socket socket) {
this.socket = socket;
}


@Override
public void run() {
try(
// 拿到输入流
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// 拿到输出流
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
while (true){
// 拿到输入的数据
String str = in.readLine();
if (str == null){
continue;
}
System.out.println("接受原始消息"+str);
// CONSUME 表示消费一条消息
if ("CONSUME".equals(str)){
// 消费消息
String s = Broker.consume();
out.println(s);
out.flush();
}else {
// 其他情况表示生产消息放到消息队列里面
Broker.produce(str);
}
}
}catch (Exception e){
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
// 服务端套接字,监听9999 端口
ServerSocket serverSocket = new ServerSocket(SERVICE_PORT);
while (true){
// todo serverSocket.accept()接受客户端Socket连接请求,并返回一个与客户端Socket对应的Socket实例,该方法是一个阻塞方法,
// 如果没有接受到客户端发送的Socket,则一直处于等待状态,线程也会被阻塞。
BrokerServer server = new BrokerServer(serverSocket.accept());
new Thread(server).start();
}
}
}

1.4.2 客户端访问

/**
* @Classname MqClient
* @Description TODO 客户端
* @Date 2021/7/5 13:36
* @Created Li Ruilong
*/
public class MqClient {


/*
* @return
* @Description 消息生产者
* @author Liruilong
* @date 2021/7/5 14:06
**/
public static void produce(String message) throws Exception {
// 利用Socket模拟发送消息的一方。
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try (
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println(message);
out.flush();
}

}


/*
* @return
* @Description 消息消费者
* @author Liruilong
* @date 2021/7/5 15:20
**/
public static String consume() throws Exception{
// 利用Socket模拟消费消息的一方。
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try (
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());
){
out.println("CONSUME");
out.flush();
String message = in.readLine();

return message;
}
}

}

生产消息

/**
* @Classname ProduceClient
* @Description TODO 生产消息
* @Date 2021/7/5 14:36
* @Created Li Ruilong
*/
public class ProduceClient {

public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
client.produce("Hello Word");

}
}

消费消息

/**
* @Classname ConsumeClient
* @Description TODO 消费消息
* @Date 2021/7/5 14:39
* @Created Li Ruilong
*/
public class ConsumeClient {
public static void main(String[] args) throws Exception {
MqClient mqClient = new MqClient();
String consume = mqClient.consume();
System.out.println("获取的消息为:"+consume);
}
}

第2章消息协议

类似于上免得​​CONSUME​​,用与区分连接Socket的是生产消息的客户端还是消费消息的客户端。

《分布式消息中间件实践》 读书笔记_分布式_03

​消息协议​​​则是指用于​​实现消息队列功能时所涉及的协议​​​。按照是否向​​行业开放​​​消息​​规范文档​​​,可以将消息协议分为​​开放协议​​​和​​私有协议​​。

常见协议有AMOP, MQTT, STOMP,XMPP等。有些特殊框架(如Redis, Kafka, ZeroMQ)根据自身需要未严格遵循MQ规范,而是基于TCP/IP自行封装了一套协议,通过网络Socket接口进行传输,实现了MQ的功能。

这里的协议可以简单地理解成对双方通信的一个约定.

2.1 AMQP

在2004年,摩根大通和iMatrix开始着手​​Advanced Message Queuing Protocol (AMQP)​​​开放标准的开发。​​2006​​​年,发布了​​AMQP规范​​​。目前​​AMQP​​​协议的版本为​​1.0​​。

一般来说,将AMQP协议的内容分为三部分:​基本概念​​功能命令​​传输层协议​

  • 基本概念是指​​AMQP内部定义的各组件及组件的功能说明​​。
  • 功能命令是指该协议所定义的一系列命令,应用程序可以基于这些命令来实现相应的功能。
  • 传输层协议是一个网络级协议,它定义了数据的传输格式,消息队列的客户端可以基于这个协议与消息代理和AMQP的相关模型进行交互通信,该协议的内容包括​​数据帧处理、信道复用、内容编码、心跳检测、数据表示和错误处理​​等。

主要概念

  • Message (消息):消息服务器所处理数据的​​原子单元​​​。消息可以携带​​内容​​​,从格式上看,消息包括一个​​内容头​​​、​​一组属性​​​和​​一个内容体​​​。这里所说的消息可以对应到许多不同​​应用程序的实体​​​,比如一个​​应用程序级消息​​​、一个​​传输文件​​​、一个​​数据流帧​​等。消息可以被保存到磁盘上,这样即使发生严重的网络故障、服务器崩溃也可确保投递消息可以有优先级,高优先级的消息会在等待同一个消息队列时在低优先级的消息之前发送,当消息必须被丢弃以确保消息服务器的服务质量时,服务器将会优先丢弃低优先级的消息。消息服务器不能修改所接收到的并将传递给消费者应用程序的消息内容体。消息服务器可以在内容头中添加额外信息,但不能删除或修改现有信息。
  • ​Publisher (消息生产者)​​​:也是一个​​向交换器发布消息​​​的​​客户端​​应用程序。
  • ​Exchange (交换器)​​​:用来接收消息生产者所发送的消息并将这些消息路由给服务器中的​​队列​​。
  • ​Binding (绑定)​​​:用于​​消息队列和交换器之间的关联​​​。一个绑定就是基于​​路由键​​​将​​交换器和消息队列连接起来的路由规则​​​,所以可以将交换器理解成一个由绑定构成的​​路由表​​(路由控制表)。
  • ​Virtual Host (虚拟主机)​​​:它是​​消息队列以及相关对象​​的集合,是共享同一个身份验证和加密环境的独立服务器域。每个虚拟主机本质上都是一个mini版的消息服务器,拥有自己的队列、交换器、绑定和权限机制。
  • ​Broker (消息代理)​​​:表示​​消息队列服务器​​​,接受客户端连接,实现​​AMQP消息队列和路由功能的过程​​。
  • ​Routing Key (路由规则)​​:虚拟机可用它来确定如何路由一个特定消息。
  • ​Queue (消息队列)​​:用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可被投入一个或多个队列中。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • ​Connection (连接)​​​:可以理解成客户端和消息队列服务器之间的一个​​TCP连接​​。
  • ​Channel (信道)​​​:仅仅当创建了连接后,若客户端还是不能发送消息,则需要为连接​​创建一个信道​​​。​​信道​​​是一条独立的​​双向数据流通道​​​,它是建立在真实的​​TCP连接内的虚拟连接​​​,​​ AMQP命令​​​都是通过​​信道​​​发出去的,不管是​​发布消息、订阅队列还是接收消息​​​,它们都通过信道完成。​​一个连接​​​可以包含​​多个信道​​​,之所以需要​​信道​​​,是因为​​TCP连接​​​的​​建立​​​和​​释放​​都是十分昂贵的,如果客户端的每一个线程都需要与消息服务器交互,如果每一个线程都建立了一个TCP连接,则暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。
  • ​Consumer (消息消费者)​​:表示一个从消息队列中取得消息的客户端应用程序。

核心组件的生命周期

(1)消息的生命周期一条消息的流转过程通常是这样的:

  • ​Publisher​​​产生一条数据,发送到​​Broker​​​, ​​Broker​​​中的​​Exchange​​​可以被理解为一个规则表(Routing Key和Queue的映射关系-Binding), ​​Broker​​​收到消息后根据​​Routing Key​​​查询投递的目标​​Queue​​.
  • ​Consumer​​​向​​Broker​​​发送订阅消息时会指定自己监听哪个​​Queue​​​,当有数据到达​​Queue​​​时​​Broker​​​会推送数据到​​Consumer​​.

(2)交换器的生命周期
每台AMQP服务器都预先创建了许多交换器实例,它们在服务器启动时就存在并且不能被销毁。如果你的应用程序有特殊要求,则可以选择自己创建交换器,并在完成工作后进行销毁。
(3)队列的生命周期
这里主要有两种消息队列的生命周期,即​​​持久化消息队列​​​和​​临时消息队列​​​。持久化消息队列可被​​多个消费者共享​​​,不管是否有消费者接收,它们都可以独立存在。临时消息队列对​​某个消费者是私有​​的,只能绑定到此消费者,当消费者断开连接时,该消息队列将被删除。

功能命令

​AMQP协议文本​​​是分层描述的,在不同主版本中划分的层次是有一定区别的。
0-9 版本共分两层: ​​Functional Layer (功能层)​​​和​​Transport Layer (传输层)​

  • 功能层定义了​​一系列命令​​,这些命令按功能逻辑组合成不同的类(Class),客户端应用可以利用它们来实现自己的业务功能。
  • 传输层将功能层所接收的消息传递给服务器经过相应处理后再返回,处理的事情包括​​信道复用、帧同步、内容编码、心跳检测、数据表示和错误处理​​​等.
    0-10 版本则分为三层: ​​Model Layer (模型层)​​​、​​Session Layer (会话层)​​​和​​Transport Layer(传输层)​​。
  • 模型层定义了一套命令,客户端应用利用这些命令来实现业务功能。
  • 会话层负责将命令从客户端应用传递给服务器,再将服务器的响应返回给客户端应用,会话层为这个传递过程提供了可靠性、同步机制和错误处理。
  • 传输层负责提供帧处理、信道复用、错误检测和数据表示

消息数据格式

所有的消息​​必须有特定的格式来支持​​​,这部分就是在​​传输层中定义​​​的。​​AMQP是二进制协议​​,协议的不同版本在该部分的描述有所不同。0-9-1版本为例,看一下该版本中的消息格式

《分布式消息中间件实践》 读书笔记_服务器_04


所有的​​消息数据​​​都被组织成​​各种类型的帧(Frame)​​​,帧可以携带​​协议方法​​​和​​其他信息​​​,所有​​帧​​​都有同样的格式,都由一个​​帧头(header, 7个字节)​​​、​​任意大小的负载(payload)​​​和​​一个检测错误的结束帧(frame-end)字节​​组成。其中:

  • ​帧头​​​包括一个​​type字段​​​、一个​​channel​​​字段和一个​​size​​字段;
  • ​帧负载​​​的格式依赖帧类型(type)
    要读取一个​​​帧​​需要三步。
  • ①读取帧头,检查​​帧类型​​​和​​通道(channel)​​.
  • ②根据​​帧类型​​​读取​​帧负载​​并进行处理。
  • ③读取结束帧字节。
    ​​​AMQP​​定义了如下帧类型。
  • type =1, “METHOD”:方法帧;
  • type=2, “HEADER”:内容头帧;
  • type=3,“BODY”:内容体帧;
  • type=4, “HEARTBEAT”:心跳帧通道

编号为0的代表​​全局连接中的所有帧​​​, 1-65535代表​​特定通道的帧​​​。​​size​​​字段是指​​帧负载​​​的大小,它的数值不包括​​结束帧字节​​​。​​AMQP​​​使用​​结束帧​​​来​​检测​​错误客户端和服务器实现引起的错误。

2.2 MQTT

MQTT (​​Message Queuing Telemetry Transport,消息队列遥测传输​​​)是IBM开发的一个即时通信协议,该协议支持​​所有平台​​​,几乎可以把所有​​联网物品​​​和​​外部连接​​​起来,被用来当作​​传感器​​​和​​制动器​​​的​​通信协议​​。

目前MQTT协议版本为​​2014​​​年发布的​​MQTT 3.1.1​​​,它是一个​​基于TCP/IP协议​​​、可提供​​发布/订阅消息模式​​​、十分轻量级的​​通信协议​​​。除标准版外,还有一个​​简化版MQTI-SN​​​,它基于​​非TCP/IP协议(如ZigBee协议)​​​,该协议主要为​​嵌入式设备​​​提供​​消息通信​​。

这里主要介绍标准版MQTT 3.1.1,该协议是一个基于​​客户端-服务器​​​的消息​​发布/订阅传输协议​​​,其特点是​​轻量、简单、开放和易于实现​​。正因为这些特点,使它常应用于很多机器计算能力有限、低带宽、网络不可靠的远程通信应用场景中。

IBM WebSphere, MQ Telemetry, IBM Messagesight. Mosquitto, Eclipse Paho, emqttd Xively.m2m.io, webMethods, Nirvana Messaging, ​​RabbitMQ​​​ Apache ​​ActiveMQ​​, Apache Apollo,Moquette, HiveMQ, Mosca, Litmus Automation Loop. JoramMQ, ThingMQ, VerneMQ.

主要概念

所有基于​​网络连接​​​的应用都会有​​客户端(Client)​​​和​​服务器(Server)​​​,而在​​MQTT协议​​​中使用者有三种身份:​​发布者(Publisher)​​​、代​​理(Broker)​​​和​​订阅者(Subscriber)​​​。其中消息的·发布者和订阅者都是客户端,​​消息代理是服务器​​​,消息发布者可以同时是订阅者。一条消息的流转过程是这样的:先由​​消息发布者​​​发布消息到​​代理服务器​​​,在消息中会包含​​主题(Topic)​​​,之后​​消息订阅者​​​如果订阅了该主题的消息,将会收到​​代理服务器​​推送的消息.(基于观察者模式理解)

《分布式消息中间件实践》 读书笔记_客户端_05

下面介绍MQTT协议中的基本组件。

(1)网络连接(Network Connection):网络连接指客户端连接到服务器时所使,用的底层​​传输协议​​​, 由该连接来负责提供有序的、可靠的、​​基于字节流​​​的​​双向传输​

(2)应用消息(​​Application Message​​​):应用消息指通过​​网络所传输​​​的​​应用数据​​​,该数据一般包括​​主题​​​和​​负载​​两部分。

(3)主题(​​Topic​​​):主题相当于​​应用消息的类型​​,消息订阅者订阅后,就会收到该主题的消息内容。

(4)负载(Payload):负载指消息订阅者具体接收的​​内容​

(5)客户端(Client):客户端指使用​​MQTT​​​的程序或设备。客户端总是通过​​网络连接​​​到​​服务端​​,它可以发布应用消息给其他相关的客户端、订阅消息用以请求接收相关的应用消息、取消订阅应用消息、从服务器断开连接等。

(6)服务器(Server): 服务器也是指程序或设备,它作为发送消息的客户端和请求订阅的客户端之间的中介。服务器的功能包括接收来自客户端的网络连接、接收客户端发布的应用消息、处理客户端的订阅和取消订阅的请求、转发应用消息给相应的客户端等。

(7) 会话(Session):客户端与服务器建立连接之后就是一个会话,客户端和服务器之间通过会话来进行状态交互。会话存在于一个网络连接之间,也可能会跨越多个连续的网络连接。会话主要用于客户端和服务器之间的逻辑层面的通信。

(8)订阅(Subscription):​​订阅一般与一个会话关联​​,会话可以包含多于一个的订阅。订阅包含一个主题过滤器和一个服务质量(Qos)等级。会话的每个订阅都有一个不同的主题过滤器。

(9)主题名(Topic Name):​​主题名​​​是附加在消息上的一个​​标签​​​,该标签与服务器的订阅相匹配,服务器会根据该​​标签​​​将消息发送给与​​订阅所匹配的​​每个客户端。

(10)主题过滤器(Topic Filter): ​​主题过滤器​​是订阅中包含的一个表达式,用于表示相关联的一个或多个主题。主题过滤器可以使用通配符。

(11) MQTT控制报文(MQTT Control Packet): ​​MQTT控制报文​​​实际上就是通过网络连接发送的信息​​数据包​​。

消息数据格式

​MQTT​​​协议是通过交换预定义的​​MQTT控制报文​​​来通信的,·内容由三部分组成
固定报头(Fixed header):存在于​​​所有控制报​​​文中,内容包含​​控制报文类型​​​、相应的​​标识位​​​和​​剩余长度​​​可变报头(Variable header):存在于​​部分控制报文​​中,由​​固定报头中​​的​​控制报文类型​​决定是否需要​​可变报头​​,以及​​可变报头​​的具体内容。
消息体(Payload):存在于​​部分控制报文​​中,表示客户端接收到的​​具体内容​​。

嗯,有些多,先不看额,遇到在深入学习。😃

2.3 STOMP

​STOMP (Streaming Text Orientated Messaging Protocol,流文本定向消息协议)​​​是一个简单的​​文本消息传输协议​​​,它提供了一种​​可互操作​​​的连接格式,允许​​客户端​​​与​​任意消息服务器(Broker)​​​进行交互。在设计​​STOMP​​​时借鉴了​​HTTP​​​的一些理念,将简易性、互通性作为其主要设计哲学,这使得​​STOMP协议​​的客户端的实现很容易。

主要介绍​​STOMP 1.2​​​版本协议的相关内容。​​STOMP​​​被设计成​​轻量级的协议​​​,使得很容易用其他语言来实现客户端和服务器端,因此它在多种语言和平台上得到广泛应用。目前有很多​​STOMP消息中间件服务器​​,如下都是STOMP协议的服务器端实现。

Apache Apollo, Apache ​​ActiveMQ​​​, ​​RabbitMQ​​ HornetQ, Stampy, StompServer.

嗯,有些多,简单了解下,先不看额,遇到在深入学习。😃

2.4 XMPP

​XMPP (可扩展通信与表示协议)​​​是一种​​基于XML​​​的​​流式即时通信协议​​​,它的特点是将​​上下文信息等嵌入到用XML表示的结构化数据​​​中,使得人与人之间、人与应用系统之间,以及应用系统之间能即时相互通信。XMPP的基本语法和语义最初主要是由​​Jabber​​​开放源代码社区于​​1999​​​年开发的,其基础部分早在​​2002-2004​​年就得到了互联网工程任务组(IETF)的批准。

​XMPP​​​定义了用于通信网络实体之间的开放协议的规范,其规范说明由一系列作用不同的​​RFC文档​​​组成, 目前核心规范主要包括RFC 6120, RFC 6121,RFC 7622及RFC 7395中定义的​​WebSocket​​绑定。

2.5 JMS

​JMS (Java Message Service)​​​即​​Java消息服务应用程序接口​​​,是Java平台中面向消息中间件的一套规范的​​Java API接口​​​,用于在两个应用程序之间或分布式系统中发送消息,进行​​异步通信​​​。这套规范由​​SUN提出​​, 目前主要使用的版本有两个:一个是2002年发布的1.1版;yi个是2013年发布的2.0版。

不同于本章上面所介绍的AMQP, MQTT,STOMP,XMPP等协议,​​JMS并不是消息队列协议的一种​​​,更不是​​消息队列产品​​​,它是与​​具体平台无关的API​​​, 目前市面上的绝大多数消息中间件厂商都支持JMS接口规范。换句话说,你可以使用​​JMS API来连接支持AMQP, STOMP等协议的消息中间件产品(比如ActiveMQ, RabbitMQ等)​​​,在这一点上它与​​Java​​​中的​​JDBC​​​的作用很像,我们可以用​​JDBC API​​来访问具体的数据库产品(比如OracleMySQL等)。

1 体系架构

JMS的作用是提供​​通用接口​​​保证基于​​JMS API​​​编写的程序适用于​​任何一种模型​​,使得在更换消息队列提供商的情况下应用程序相关代码也不需要做太大的改动。

  • (1)​​点对点​​​模型在​​点对点(Point to Point)模型​​​中,应用程序由​​队列(Queue)​​​、​​发送者(Sender)​​​和​​接收者(Receiver)​​​组成。每条消息都被发送到一个特定的队列中,接收者从队列中获取消息

    ​​​队列中一直保留着消息,直到它们被接收或超时​​。点对点 模型的特点如下
  • 每条消息只有一个接收者,消息一旦被接收就不再保留在消息队列中了。
  • 发送者和接收者之间在时间上没有依赖。也就是说,当消息被发送之后,不管接收者有没有在运行,都不会影响消息被发送到队列中。
  • (2)​​发布/订阅模型​​​在​​发布/订阅(Pub/Sub)​​​模型中,应用程序由​​主题(Topic)​​​、​​发布者(Publisher)​​​和​​订阅者(Subscriber)​​​组成。发布者发布一条消息,该消息通过主题传递给所有的订阅者

    在这种模型中,​​​发布者​​​和​​订阅者​​​彼此不知道对方,它们是匿名的并且可以​​动态发布​​​和​​订阅主题​​​。主题用于​​保存​​​和​​传递消息​​​,并且会一直保存消息直到​​消息被传递给订阅者​​。发布/订阅模型的特点如下:
  • 每条消息可以有多个订阅者。
  • 发布者和订阅者之间有时间上的依赖。一般情况下,某个主题的订阅者需要在创建了订阅之后才能接收到消息,而且为了接收消息订阅者必须保持运行的状态。
  • JMS允许订阅者创建一个​​可持久化的订阅​​,这样即使订阅者没有运行也能接收到所订阅的消息。
  • 每条消息都会传送给该主题下的​​所有订阅者​​。通常发布者不会知道也意识不到哪一个订阅者正在接收消息。

2.基本概念

按照JMS规范中所说的,一个JMS应用由如下几个部分组成。

  • ​JMS客户端(JMS Client)​​:指发送和接收消息的Java程序。
  • ​非JMS客户端(Non-JMS Client)​​:指使用消息系统原生的客户端API代替JMS的客户端。如果应用程序在JMS规范前就已存在,则它可能同时包含JMS客户端和非JMS客户端。
  • ​消息(Message)​​:每个应用都定义了一组消息,用于多个客户端之间的消息通信。
  • ​JMS提供商(JMS Provider)​​:指实现了JMS API的实际消息系统。
  • ​受管对象(Administered Object)​​​:指由管理员创建,并预先配置好给客户端使用的JMS对象。JMS中的受管对象分为两种,即​​ConnectionFactory (客户端使用这个对象来创建到提供者的连接)​​​和​​Destination (客户端使用这个对象来指定发送或接收消息的目的地)​​。

而具体到JMS应用程序,则主要涉及以下基本概念。

  • ​生产者(Producer)​​:创建并发送消息的JMS客户端,在点对点模型中就是发送者,在发布/订阅模型中就是发布者。
  • ​消费者(Consumer)​​:接收消息的JMS客户端,在点对点模型中就是接收者,在发布/订阅模型中就是订阅者。
  • ​客户端(Client)​​:生产或消费消息的基于Java的应用程序或对象。
  • ​队列(Queue )​​:一个容纳被发送的等待阅读的消息的区域。它是点对点模型中的队列。
  • ​主题(Topic)​​:一种支持发送消息给多个订阅者的机制。它是发布/订阅模型中的主题。
  • ​消息(Message)​​​:在JMS客户端之间传递的数据对象。JMS消息又包括​​消息头、属性和消息体​​三部分。

编程接口

(1) ConnectionFactory接口(​连接工厂​)

(2) Destination接口(​目的地​​) : Destination是一个包装了消息目的地标识符的受管对象。​​消息目的地是指消息发布和接收的地点,消息目的地要么是队列要么是主题​​​。对于消息生产者来说,它的Destination是某个队列或某个主题;对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以Destination实际上就是两种类型的对象:​​Queue和Topic​​,可以通过JNDI来查找Destination.

(3) Connection接口(​连接​): Connection表示在客户端和JMS系统之间建立的连接(实际上是对TCP/IP Socket的包装)。Connection可以产生一个或多个Session,跟ConnectionFactory一样, Connection也有两种类型:QueueConnection和TopicConnection.

(4) Session接口(​会话​): Session是实际操作消息的接口,表示一个单线程的上下文,用于发送和接收消息。因为会话是单线程的,所以消息是按照发送的顺序一个个接收的。可以通过Session创建生产者、消费者、消息等。在规范中Session还提供了事务的功能。Session也分为两种类型: QueueSession和TopicSession

(5) MessageProducer接口(​消息生产者​​): 消息生产者由Session创建并用于将消息发送到Destination.消费者可以同步(阻塞模式)或异步(非阻塞模式)接收队列和主题类型的消息。消息生产者有两种类型:​​ QueueSender​​​和​​TopicPublisher.​

(6) MessageConsumer接口(​消息消费者​​):消息消费者由Session创建,用于接收被发送到Destination的消息。消息消费者有两种类型: QueueReceiver和TopicSubscriber.
(7) Message接口(​​消息​): 消息是在消费者和生产者之间传送的对象,即将消息从一个应用程序发送到另一个应用程序。

(8) MessageListener (​消息监听器​​): 如果注册了消息监听器,那么当消息到达时将自动调用监听器的​​onMessage​​方法。

JMS 1.0 示例

消息消費著

package msg_queue.jms;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.TimeUnit;

/**
* @Classname QueueConsumer
* @Description TODO 消息消費著
* @Date 2021/7/9 15:12
* @Created Li Ruilong
*/
public class QueueConsumer {

public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);

Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建队列,作为消费者消费消息的目的地
Queue sessionQueue = session.createQueue("test");

// 消費者
MessageConsumer consumer = session.createConsumer(sessionQueue);

consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());

} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}

}
});
TimeUnit.MICROSECONDS.sleep(100000);
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}


}

}

消息生产者

package msg_queue.jms;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @Classname QueueProducer
* @Description TODO 消息生产者
* @Date 2021/7/9 0:07
* @Created Li Ruilong
*/
public class QueueProducer {

public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) {

// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);
Session session = null;
Connection connection = null;
try {
// 创建连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建队列,需要指定队列名称,消息生产者和消费者将根据它来发送、接收对应的消息
Queue sessionQueue = session.createQueue("test");
// 消息生產者
MessageProducer producer = session.createProducer(sessionQueue);
TextMessage message = session.createTextMessage("测试一个点对点的一条消息");
producer.send(message);
session.commit();

} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}

}

}
}

JMS 2.0 示例

在JMS 2.0中主要进行了​​易用性​​​方面的改进,这样可以帮助开发者减少代码的编写量。新的API被称作​​简化的API ​​​(Simplified API),其比JMS 1.1 API更简单易用;后者被称作​​经典API​​ (Classic API). 简化的API由三个新接口构成: ​JMSContext.JMSProducer和JMSConsumer.​

  • ​JMSContext​​:用于替换经典API中单独的Connection和Session
  • ​JMSProducer​​:用于替换经典API中的MessageProducer,其支持以链式操作(方法链)方式配置消息传递选项、消息头和消息属性。
  • ​JMSConsumer​​:用于替换经典API中的MessageConsumer,其使用方式与JMSProducer类似。

简化的API不仅提供了经典API的所有特性,还增加了一些其他特性。经典API并没有被弃用,而是作为JMS的一部分被保留下来。下面通过发送文本消息的例子来看一下两者之间的区别。

第3章RabbitMQ

3.1 简介

1. RabbitMQ特点

RabbitMQ是一个由​​Erlang​​​语言开发的基于​​AMOP标准​​​的​​开源​​​实现。​​RabbitMQ​​​最初起源于​​金融系统​​​,用于在​​分布式系统​​​中存储转发消息,在​​易用性​​​、​​扩展性​​​、​​高可用性​​​等方面表现不俗。其具体​​特点​​包括:

  • 保证可靠性( Reliability), ​​RabbitMQ​​​使用一些机制来保证可靠性,如​​持久化、传输确认、发布确认​​等。
  • 具有灵活的​​路由(Flexible Routing)功能​​​。在消息进入队列之前,是通过​​Exchange (交换器)​​​来​​路由消息​​​的。对于典型的路由功能, ​​RabbitMQ​​​已经提供了一些内置的​​Exchange​​​来实现。针对更复杂的路由功能,可以将​​多个Exchange​​​绑定在一起,也可以通过插件机制来实现自己的​​Exchange​​.
  • 支持消息集群(Clustering),多台​​RabbiMQ​​​服务器可以组成一个集群,形成一个逻辑​​Broker​​.
  • 具有高可用性(Highly Available),队列可以在​​集群中​​​的机器上进行​​镜像​​,使得在部分节点出现问题的情况下队列仍然可用。
  • 支持多种协议(Multi-protocol), ​​RabbitMQ​​​除支持​​AMQP​​​协议之外,还通过插件的方式支持其他消息队列协议,比如​​STOMP, MQTT​​等。
  • 支持多语言客户端(Many Client),​​RabbitMQ​​几乎支持所有常用的语言,比如Java. .NET, Ruby等
  • 提供管理界面(Management UI), ​​RabbitMQ​​提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面
  • 提供跟踪机制(Tracing), ​​RabbitMQ​​提供了消息跟踪机制,如果消息异常,使用者可以查出发生了什么情况。
  • 提供插件机制(Plugin System), ​​RabbitMQ​​提供了许多插件,从多方面进行扩展,也可以编写自己的插件.

2. RabbitMQ基本概念

​RabbitMQ​​​是​​AMQP​​​协议的一个​​开源实现​​​,所以其​​基本概念​​​也就是​​AMQPt​​中的基本概念。如图是RabbitMQ的整体架构图。

《分布式消息中间件实践》 读书笔记_客户端_06

  • ​Message (消息)​​​:消息是不具名的,它由​​消息头​​和​​消息体​​组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括 ​​routing-key (路由键)​​,​​priority (相对于其他消息的优先级)​​、 ​​delivery-mode (指出该消息可能需要持久化存储)等​​。
  • ​Publisher (消息生产者)​​:一个向交换器发布消息的客户端应用程序。
  • ​Exchange (交换器)​​​:用来​​接收生产者​​发送的消息,并将这些​​消息路由给服务器​​中的​​队列​​。.
  • ​Binding (绑定)​​​:用于​​消息队列​​和​​交换器​​之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的​​路由规则​​,所以可以将交换器理解成一个由绑定构成的​​路由表​​。
  • ​Queue (消息队列)​​​:用来保存消息直到发送给消费者。它是​​消息的容器​​,也是​​消息的终点​​。一条消息可被投入一个或多个队列中。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • ​Connection (网络连接)​​:比如一个TCP连接。
  • ​Channel (信道)​​​:​​多路复用​​连接中的一条独立的​​双向数据流通道​​。​​信道​​是建立在​​真实的TCP​​连接内的​​虚拟连接​​, ​​AMQP​​命令都是通过信道发送出去的,不管是​​发布消息​​、​​订阅队列​​还是​​接收消息​​,这些动作都是通过​​信道​​完成的。
    因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,以复用一个TCP连接。.
  • ​Consumer (消息消费者)​​:表示一个从消息队列中取得消息的客户端应用程序。
  • ​Virtual Host (虚拟主机,在RabbitMQ中叫vhost)​​​:表示一批​​交换器、消息队列​​和相关对象。​​虚拟主机​​是共享相同的​​身份认证和加密环境的独立服务器域​​。本质上每个vhost就是一台缩小版的​​RabbitMQ服务器​​,它拥有自己的​​队列、交换器、绑定​​和​​权限机制​​。​​vhost​​是​​AMQP​​概念的基础,必须在连接时指定, RabbitMQ默认的vhost是"1"
  • ​Broker​​:表示消息队列服务器实体。

(1) AMQP中的消息路由

在​​AMQP​​​中增加了​​Exchange​​​和​​Binding​​​的角色。生产者需要把消息发布到​​Exchange​​​上,消息最终到达队列并被消费者接收,而​​Binding​​​决定​​交换器上​​​的消息应该被发送到哪个​​队列​​中。

《分布式消息中间件实践》 读书笔记_消息队列_07

(2)交换器类型

不同类型的​​交换器分发消息​​​的​​策略​​​也不同,目前交换器有4种类型: ​​Direct​​​, ​​Fanout​​​, ​​Topic​​​,​​Headers​​​。其中​​Headers​​​交换器匹配​​AMQP​​​消息的​​Header​​​而不是​​路由键​​​。此外, ​​Headers​​​交换器和​​Direct​​交换器完全一致,但性能相差很多,目前几乎不用了,所以下面我们看另外三种类型。

Direct交换器

如果消息中的路由键(routing key)和Binding中的绑定键(binding key)一致,交换器就将消息发送到对应的队列中.

《分布式消息中间件实践》 读书笔记_消息队列_08


路由键与队列名称要​​完全匹配​​​,如果将一个队列绑定到交换机要求路由键为“dog",则只转发routing key标记为"dog"的消息,不会转发"dog.puppy"消息,也不会转发"dog.guard "消息等。​​Direct交换器是完全匹配、单播的模式​​。

Fanout交换器

Fanout交换器不处理路由键,只是简单地将队列绑定到交换器

《分布式消息中间件实践》 读书笔记_服务器_09


发送到交换器的每条消息都会被转发到与该交换器绑定的所有队列中,这很像​​子网广播​​,子网内的每个主机都获得了一份复制的消息。通过Fanout交换器转发消息是最快的。

Topic交换器

Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配,此时队列需要绑定一种模式。

《分布式消息中间件实践》 读书笔记_分布式_10


Topic交换器将​​路由键和绑定键的字符串切分成单词​​​,这些单词之间用点“.”隔开,该交换器会识别两个通配符: ​​“#”和“*”,其中“#”匹配0个或多个单词, “*”匹配不多不少一个单词。​

3.2 工程实例

RabbitMQ官网:​​https://www.rabbitmq.com/​​

基于Docker的安装:​​https://registry.hub.docker.com/_/rabbitmq?tab=description&page=2&ordering=last_updated​​​ 这里我们先在阿里云上装一个​​RabbitMQ​​,用​​Docker​​来安装,直接拉去镜像。

# 启动docker服务
[root@liruilong ~]# systemctl restart docker
# 查看镜像
[root@liruilong ~]# docker images
#指定版本,该版本包含了web控制页面
[root@liruilong ~]# docker pull rabbitmq:management

#运行容器:
#方式一:默认guest 用户,密码也是 guest
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

# 发布Docker服务,将端口映射到15672,5672
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
2189f2fa53f1e76306a2ad422e0fa33bca1ae0f3ee77514573d71aca9ce24801
[root@liruilong ~]#

这里需要注意的是端口绑定,需要把​访问​​端口和​​管理​​端口同时绑定。如果是ESC的话,需要配置安全组

《分布式消息中间件实践》 读书笔记_服务器_11

《分布式消息中间件实践》 读书笔记_消息队列_12

3.2.1Java访问RabbitMQ实例

RabbitMQ支持多种语音访问。添加依赖

<dependency> 
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>

消息生产者

package msg_queue.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* @return
* @Description TODO 消息生产者
* @author Liruilong
* @date 2021/7/9 22:16
**/
public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("39.97.241.18");
//默认访问5672端口 factory.setPort(5672);
factory.setVirtualHost("/");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//创建信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
// direct 类型,路由键与队列名称要`完全匹配`
channel.exchangeDeclare(exchangeName, "direct", true);
// 定义 路由键
String routingKey = "testRoutingKey";
//发布消息
byte[] messageBodyBytes = "这是我第一次学习Rabbitmq".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

channel.close();
conn.close();
}
}

首先创建一个连接工厂,再根据连接工厂创建连接,之后从连接中创建信道,接着声明一个交换器和指定路由键,然后才发布消息,最后将所创建的信道、连接等资源关闭。​​代码中的ConnectionFactory, Connection、 Channel都是RabbitMQ提供的API中最基本的类。​

  • ​ConnectionFactory​​是Connection的制造工厂
  • ​Connection​​代表RabbitMQ的Socket连接,它封装了Socket操作的相关逻辑。
  • ​Channel​​是与RabbitMQ打交道的最重要的接口,大部分业务操作都是在Channel中完成的,比如定义队列、定义交换器、队列与交换器的绑定、发布消息等。

消息消费者

package msg_queue.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* @return
* @Description TODO 消息消费者
* @author Liruilong
* @date 2021/7/9 23:45
**/
public class Consumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("39.97.241.18");
factory.setVirtualHost("/");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//创建信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "testRoutingKey";
//绑定队列,通过键 testRoutingKey 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);

while (true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消费的路由键:" + envelope.getRoutingKey());
System.out.println("消费的内容类型:" + properties.getContentType());
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.printf("消费的消息体内容:%s",new String(body, "UTF-8"));

}
});
}
}
}

​消息消费者通过不断循环等待服务器推送消息​​​,一旦有消息过来,就在控制台输出消息的相关内容。一开始的创建连接、创建信道、声明交换器的代码和发布消息时是一样的,但在​​消费消息​​​时需要指定​​队列名称​​​,所以这里多了​​绑定队列​​这一步,接下来是循环等待消息过来并打印消息内容

第4章ActiveMQ

第5章Kafka

第6章RocketMC


举报

相关推荐

0 条评论