Spring与消息
概述
- 消息中间件的主要作用:提高系统异步通信、扩展解耦能力
- 消息服务的两个重要概念:消息代理(message broker)和目的地(destination)
- 消息队列的两种形式的目的地:队列(点对点消息通信)和主题(发布/订阅消息通信)
- 点对点式:消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列;消息只有唯一的发送着和接收者,但并非说只能有一个接受这
- 发布订阅式:发送者(发布者)发送消息主题,多个接收者(订阅者)监听(订阅)这个主题,消息会被接收者接收
- JMS(Java Message Service)Java消息服务:基于jvm消息代理的规范。activeMQ、hornetMQ是JMS实现
- AMQP(Advanced Message Queuing Protocol):高级消息队列,也是一个消息代理的规范,兼容JMS。RabbitMQ是AMQP实现。
- Spring支持:spring-jms提供了对JMS的支持;spring-rabbit提供了对AMQP的支持;需要ConnectionFactory的实现来链接消息代理;提供JmsTemplate、RabbitTemplate来发送消息;@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息;@EnableJms、@EnableRabbit开启支持
- SpringBoot自动配置:JmsAutoConfiguration、RabbitAutoConfiguration
RabbitMQ
核心概念
- Message:由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,包括routing-key、priority、delivery-mode等
- Publisher:消息的生产者
- Exchange:交换其,接收生产者的消息,然后通过绑定的routing-key发送到队列。包含direct、fanout、topic和headers四种类型
- Queue:消息队列,保存消息,等待消费者消费。
- Binding:绑定,消息队列和交换器之间的关联。一个绑定就是基于routing-key将交换器和消息队列连接起来的规则。
- Connection:网络连接
- Channel:信道,多路服用连接中的一条独立的双向数据流通道。建立到网络连接上的虚拟连接
- Consumer:消费者,消费消息
- Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
- Broker:消息队列服务器实体
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xykbnnDT-1688290522499)(src/main/resources/image/RabbitMQ概览.png)]](https://file.cfanz.cn/uploads/png/2023/07/13/16/W4TC073U2e.png)
使用教程
- 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- 使用RabbitListener进行消息监听
@Service
public class OrderService {
@RabbitListener(queues = "exchange")
public void receive(Order order) {
System.out.println("order = " + order);
}
}
public class Order {
private String orderNo;
private String productName;
private Double price;
private Long number;
public String getOrderNo() {
return orderNo;
}
public void setOrderNo(String orderNo) {
this.orderNo = orderNo;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public Long getNumber() {
return number;
}
public void setNumber(Long number) {
this.number = number;
}
@Override
public String toString() {
return "Order{" +
"orderNo='" + orderNo + '\'' +
", productName='" + productName + '\'' +
", price=" + price +
", number=" + number +
'}';
}
}
- 使用AmqpAdmin管理组件
@SpringBootTest
@RunWith(SpringRunner.class)
public class Test01 {
@Autowired
private AmqpAdmin amqpAdmin;
@Test
public void test01() {
amqpAdmin.declareExchange(new DirectExchange("test_exchange"));
amqpAdmin.declareQueue(new Queue("test_queue"));
amqpAdmin.declareBinding(new Binding(
"test_queue",
Binding.DestinationType.QUEUE,
"test_exchange",
"test_routingKey",
null
));
}
}
- 在Test中使用RabbitTemplate发送消息,消息实体为Order
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@org.junit.Test
public void name02() {
Order order = new Order();
order.setOrderNo("no0001");
order.setProductName("a book");
rabbitTemplate.convertAndSend("test_exchange","test_routingKey", order);
}
}
- 监听接收到消息
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AtY48bDb-1688290522500)(src/main/resources/image/接受消息.png)]](https://file.cfanz.cn/uploads/png/2023/07/13/16/67LaU16386.png)