一、前言
二、概念
中间件:
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件。
消息中间件:
关注与数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。
三、规范
四、ActiveMQ
4.1、下载安装(Win)
Ps1:直接启动:找到 activemq.bat 启动(推荐管理员方式运行)。
Ps2:使用服务启动:找到 InstallService.bat 启动(推荐管理员方式运行)。
4.2、队列模式
AppProducer 类
package com.myimooc.jms.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* App 生产者-队列模式
*/
public class AppProducer {
// 指定ActiveMQ服务的地址
private static final String URL = "tcp://127.0.0.1:61616";
// 指定队列的名称
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);
// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.发布消息
producer.send(textMessage);
System.out.println("消息发送:" + textMessage.getText());
}
// 9.关闭连接
connection.close();
}
}
AppConsumer 类
package com.myimooc.jms.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* App 消费者-队列模式
*/
public class AppConsumer {
/** 指定ActiveMQ服务的地址 */
private static final String URL = "tcp://127.0.0.1:61616";
/** 指定队列的名称 */
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);
// 6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
// 7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:" + textMessage.getText());
} catch (JMSException e) {
System.out.println("接收消息异常:");
e.printStackTrace();
}
}
});
// 8.关闭连接
//connection.close();
}
}
JMS队列模式:
消息发布出去后,只要有消费者消费就算OK,不存在消费者要先订阅(启动监听)的问题。
4.3、主题模式
AppProducer 类
package com.myimooc.jms.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* App 生产者-主题模式
*/
public class AppProducer {
/** 指定ActiveMQ服务的地址 */
private static final String URL = "tcp://127.0.0.1:61616";
/** 指定主题的名称 */
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);
// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.发布消息
producer.send(textMessage);
System.out.println("消息发送:" + textMessage.getText());
}
// 9.关闭连接
connection.close();
}
}
AppConsumer 类
package com.myimooc.jms.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* App 消费者-主题模式
*/
public class AppConsumer {
/** 指定ActiveMQ服务的地址 */
private static final String URL = "tcp://127.0.0.1:61616";
/** 指定主题的名称 */
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.创建Connection
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);
// 6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
// 7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:" + textMessage.getText());
} catch (JMSException e) {
System.out.println("接收消息异常:");
e.printStackTrace();
}
}
});
// 8.关闭连接
//connection.close();
}
}
JMS主题模式:
“订阅者先订阅,发布者后发布消息 ---导致--> 订阅者才能收到消息"
就个人理解,先启动订阅者就是先于发布者监听目标队列,其次再由发布者向目标队列发送消息,这样订阅者才会收到信息。如果在订阅前先发布消息再订阅,那么之前的消息收不到,订阅之后的消息还能收到。