0
点赞
收藏
分享

微信扫一扫

ActiveMQ - 基础篇


一、前言

ActiveMQ - 基础篇_ActiveMQActiveMQ - 基础篇_JMS_02ActiveMQ - 基础篇_消息中间件_03ActiveMQ - 基础篇_JMS_04ActiveMQ - 基础篇_JMS_05


二、概念

中间件:

非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件。

消息中间件:

关注与数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。

ActiveMQ - 基础篇_消息中间件_06ActiveMQ - 基础篇_ActiveMQ_07ActiveMQ - 基础篇_ActiveMQ_08ActiveMQ - 基础篇_消息中间件_09ActiveMQ - 基础篇_JMS_10

ActiveMQ - 基础篇_JMS_11ActiveMQ - 基础篇_ActiveMQ_12ActiveMQ - 基础篇_JMS_13ActiveMQ - 基础篇_JMS_14ActiveMQ - 基础篇_ActiveMQ_15ActiveMQ - 基础篇_JMS_16


三、规范

ActiveMQ - 基础篇_JMS_17ActiveMQ - 基础篇_JMS_18ActiveMQ - 基础篇_JMS_18ActiveMQ - 基础篇_消息中间件_20ActiveMQ - 基础篇_JMS_21ActiveMQ - 基础篇_ActiveMQ_22ActiveMQ - 基础篇_ActiveMQ_23

ActiveMQ - 基础篇_ActiveMQ_24ActiveMQ - 基础篇_JMS_25


四、ActiveMQ

4.1、下载安装(Win)

ActiveMQ - 基础篇_消息中间件_26

Ps1:直接启动:找到 activemq.bat 启动(推荐管理员方式运行)。

Ps2:使用服务启动:找到 InstallService.bat 启动(推荐管理员方式运行)。

ActiveMQ - 基础篇_ActiveMQ_27

4.2、队列模式

AppProducer 类

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* App 生产者-队列模式
*/
public class AppProducer {

// 指定ActiveMQ服务的地址
private static final String URL = "tcp://127.0.0.1:61616";

// 指定队列的名称
private static final String QUEUE_NAME = "queue-test";

public static void main(String[] args) throws JMSException {

// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建Connection
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5. 创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);

// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 100; i++) {

// 7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);

// 8.发布消息
producer.send(textMessage);

System.out.println("消息发送:" + textMessage.getText());
}

// 9.关闭连接
connection.close();
}

}

AppConsumer 类

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* App 消费者-队列模式
*/
public class AppConsumer {

/** 指定ActiveMQ服务的地址 */
private static final String URL = "tcp://127.0.0.1:61616";

/** 指定队列的名称 */
private static final String QUEUE_NAME = "queue-test";

public static void main(String[] args) throws JMSException {

// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建Connection
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标
Destination destination = session.createQueue(QUEUE_NAME);

// 6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);

// 7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:" + textMessage.getText());
} catch (JMSException e) {
System.out.println("接收消息异常:");
e.printStackTrace();
}
}
});

// 8.关闭连接
//connection.close();
}

}

JMS队列模式:

消息发布出去后,只要有消费者消费就算OK,不存在消费者要先订阅(启动监听)的问题。


4.3、主题模式

AppProducer 类

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* App 生产者-主题模式
*/
public class AppProducer {

/** 指定ActiveMQ服务的地址 */
private static final String URL = "tcp://127.0.0.1:61616";

/** 指定主题的名称 */
private static final String TOPIC_NAME = "topic-test";

public static void main(String[] args) throws JMSException {

// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建Connection
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5. 创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);

// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 100; i++) {

// 7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);

// 8.发布消息
producer.send(textMessage);

System.out.println("消息发送:" + textMessage.getText());
}

// 9.关闭连接
connection.close();
}

}

AppConsumer 类

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* App 消费者-主题模式
*/
public class AppConsumer {

/** 指定ActiveMQ服务的地址 */
private static final String URL = "tcp://127.0.0.1:61616";

/** 指定主题的名称 */
private static final String TOPIC_NAME = "topic-test";

public static void main(String[] args) throws JMSException {

// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建Connection
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话(第一个参数:是否在事务中处理)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);

// 6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);

// 7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:" + textMessage.getText());
} catch (JMSException e) {
System.out.println("接收消息异常:");
e.printStackTrace();
}
}
});

// 8.关闭连接
//connection.close();
}

}

JMS主题模式:

“订阅者先订阅,发布者后发布消息  ---导致--> 订阅者才能收到消息"

就个人理解,先启动订阅者就是先于发布者监听目标队列,其次再由发布者向目标队列发送消息,这样订阅者才会收到信息。如果在订阅前先发布消息再订阅,那么之前的消息收不到,订阅之后的消息还能收到。


举报

相关推荐

0 条评论