1.消息中间件应用场景
1.1异步处理
比如场景:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。
(1)串行方式
将注册信息写入数据库成功后,发送注册邮件,在发送注册短信。以上三个任务全部完成后,返回给客户端。
(2)并行方式
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
(3)异步处理
引入消息中间件,将部分的业务逻辑,进行异步处理,改造后的架构如下:
可以发现,系统的吞吐量明显提高。
1.2应用解耦
场景:用户下单后,订单系统需要通知库存系统。
传统做法是订单系统调用库存系统的接口。缺点:假设库存系统无法访问,则导致订单失败。
解决方案:引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
两个系统之间互不影响。因为下单后,订单系统写入消息队列就不再关心其他的后续操作。实现了订单系统和库存系统的应用解耦。
1.3流量消峰
流量消峰一般在秒杀或团抢活动中使用广泛。一般会因为流量暴增导致应用挂掉。解决这个问题,一般需要在应用前端加入消息队列。
通过加入消息队列完成以下功能:
(1)可以控制活动人数
(2)可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。
2.JMS消息模型
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)
(1)P2P(Ponit to Point) 点对点模型(Queue队列模型)
(2)Publish/Subscribe 发布/订阅模型(Topic主题模型)
2.1点对点模型
点对点模型:即生产者和消费者之间的消息往来。
每个消息都被发送到特定的消息队列,接受者从队列中获取消息。队列保留着消息,直到它们被消费或超时。
特点:
- 每个消息只有一个消费者,一旦被消费,消息就不再在消息队列中;
- 发送者和接受者之间在时间上没有依赖性,也就是当发送者发送了消息之后,不管接受者有没有在运行,它不会影响到消息被发送到队列。
- 接收者在成功接收消息之后必须要向对列应答成功。
2.2发布/订阅模型
包含三个角色:主题(Topic),发布者,订阅者。多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。
发送者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
特点:
- 每个消息可以有多个消费者;
- 发布者和订阅者之间有时间上的依赖性(必须先订阅主题,再发送消息);
- 订阅者必须保持运行的状态,才能接受发布者发布的消息。
3.JMS核心API
(1)ConnectionFactory
创建Connection对象工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
(2)Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于生产者来说,Destination是某个队列或主题;对于消费者来说,Destination也是某个队列或主题。因此,Destination实际上就是两种类型的对象:Queue、Topic。
(3)Connection
Connection是客户端和JMS系统之间建立的连接。Connection可以生产一个或多个Session。
(4)Session
Session是对消息进行操作的接口,可以通过Sessionu创建生产者、消费者、消息。Session提供了事务的功能,如果需要使用Session发送/接收消息时,可以将这些发送/接收动作放到一个事务中。
(5)Producer
Producer消息生产者,消息生产者由Session创建,并用于将消息发送到Destination。
消息生产者有两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法,send和publish发送消息。
(6)Consumer
Consumer消息消费者,消息消费者者由Session创建,并用于接收被发送到Destination的消息。消息消费者有两种类型:QueueRecieve和TopicSubscriber。可分别通过session的createReceiver或createSubscriber来创建。
(7)MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
4.原生JMS API操作ActiveMQ
4.1 点对点模式
//1.创建连接工厂 // 2.创建链接 // 3.打开链接 // 4.创建session // 5.创建/指定目标地址 // 6.创建消息生产者/消费者 // 7.创建/接收消息 // 8.发送消息 // 9.释放资源
1.创建空项目,然后基于maven创建一个普通java项目,导入依赖
2.创建消息生产者类,按照9个步骤走
package org.weiwei;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 消息生产者
*/
public class Produer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");//注意tcp端口是61616,http端口是8161
// 2.创建链接
Connection connection = factory.createConnection();
// 3.打开链接
connection.start();
// 4.创建session
/*
参数1:是否开启事务操作
参数2:消息确机制
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目标地址
Queue queue = session.createQueue("queue1");
// 6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 7.创建消息
TextMessage testMessage = session.createTextMessage("test message");
// 8.发送消息
producer.send(testMessage);
System.out.println("消息发送完成");
// 9.释放资源
session.close();
connection.close();
}
}
3.运行producer
可以看到有1条消息等待被消费
4.创建新模块,同样导入相同依赖,创建消费者类
package org.weiwei;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 点对点消费者方式(第一种方案)
*/
public class P2P_Consumer1 {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");
// 2.创建链接
Connection connection = factory.createConnection();
// 3.打开链接
connection.start();
// 4.创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.指定目标地址
Queue queue = session.createQueue("queue1");
// 6.创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
// 7.接收消息
while (true){
Message message = consumer.receive();
//如果没有消息了,就结束
if (message==null){
break;
}
//如果还有消息,判断是什么类型的消息
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("接收的消息:"+textMessage.getText());
}
}
}
}
输出:
5.点对点消息消费者(第二种方案:推荐)
package org.weiwei;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 点对点消费者方式2
*/
public class P2P_Consumer2 {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");
// 2.创建链接
Connection connection = factory.createConnection();
// 3.打开链接
connection.start();
// 4.创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.指定目标地址
Queue queue = session.createQueue("queue1");
// 6.创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
// 7.设置消息监听器,接收消息
consumer.setMessageListener(new MessageListener() {
//处理消息
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收的消息(2):" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//注意:在监听器模式下,千万不要关闭连接,一旦关闭,消息无法接收
}
}
4.2发布/订阅模式
(1)消息生产者变化不大,改变的地方为第五步
package org.weiwei;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 发布订阅模式---消息生产者
*/
public class PS_Produer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");//注意tcp端口是61616,http端口是8161
// 2.创建链接
Connection connection = factory.createConnection();
// 3.打开链接
connection.start();
// 4.创建session
/*
参数1:是否开启事务操作
参数2:消息确机制
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目标地址
Topic topic = session.createTopic("topic1");
// 6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 7.创建消息
TextMessage testMessage = session.createTextMessage("test message-----topic");
// 8.发送消息
producer.send(testMessage);
System.out.println("topic消息发送完成");
// 9.释放资源
session.close();
connection.close();
}
}
(2)消息消费者同理改变的地方为第五步
package org.weiwei;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 发布订阅模式消费者方式--监听器模式
*/
public class PS_Consumer2 {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.109:61616/");
// 2.创建链接
Connection connection = factory.createConnection();
// 3.打开链接
connection.start();
// 4.创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.指定目标地址
Topic topic = session.createTopic("topic1");
// 6.创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7.设置消息监听器,接收消息
consumer.setMessageListener(new MessageListener() {
//处理消息
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收的消息(2)--topic:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//注意:在监听器模式下,千万不要关闭连接,一旦关闭,消息无法接收
}
}
(3)运行:注意【先运行消费者,再运行生产者------先订阅再消费】
输出:
topic消息发送完成
接收的消息(2)--topic:test message-----topic
5. SpringBoot 整合 ActiveMQ
5.1点对点模式
详情见:
消息中间件解决方案JMS(1)_Java-请多指教的博客-CSDN博客
5.2发布订阅模式
创建一个springboot项目,勾选web和activemq
(1)配置文件
application.properties
(2)启动类开启注解@EnableJms//启动消息队列
(3)配置类
package com.weiwei.activemq_ps.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
@Configuration
public class activemqConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.topic-name}")
private String topicName;
//创建链接工厂
@Bean
public ConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(username,password,brokerUrl);
}
//jms的消息模板
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
return new JmsMessagingTemplate(connectionFactory());
}
//发布订阅模式消息对象
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
(4)消息生产者
package com.weiwei.activemq_ps.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Topic;
@RestController
public class PSProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
//消息发送方法
@RequestMapping("/sendtopic.do")
public String sendQueue(String username){
/*
参数1:主题名称
参数2:消息内容
*/
jmsMessagingTemplate.convertAndSend(this.topic,username);
return "send-success-topic";
}
}
(5)消息消费者
package com.weiwei.activemq_ps.controller;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 发布订阅模式的消息消费者
*/
@Component //放入IOC容器
public class PSConsumer1 {
@JmsListener(destination = "${spring.activemq.topic-name}")
public void readActiveMessage(Message message){
if (message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收topic消息:"+textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}
}
}
运行测试:
localhost:8080/sendtopic.do?username=wuwei
6.ActiveMQ消息组成与高级特性
6.1JMS消息组成详解
整个JMS消息协议组成结构:
结构 | 描述 |
JMS Provider | 消息中间件/消息服务器 |
JMS Producer | 消息生产者 |
JMS Consumer | 消息消费者 |
JMS Message | 消息(重要) |
JMS Message消息由三部分组成:
(1)消息头
(2)消息体
(3)消息属性
1.消息头
JMS消息头定义了若干字段,用于客户端与JMS提供者之间识别和发送消息。