开始使用ActiveMQ
1、JDK下载和安装
2、ANT下载和安装
3、ActiveMQ下载和安装
(1)下载地址: http://activemq.apache.org/download-archives.html
(2)解压apache-activemq-5.4.3-bin.zip
4、监控ActiveMQ:
我们可以通过Web界面来监控ActiveMQ,用浏览器访问:http://localhost:8161/admin
如果ActiveMQ的版本是
5.4,则需要输入默认账号admin/admin,该账号可以在conf/jetty-realm.properties中配置
ActiveMQ的概念
(1)JMS是J2EE体系标准的一部分,规定了应用之间同步、异步进行消息发送、接受的相关规范。ActiveMQ是实现了JMS标准的消息服务器,ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线
(2)client:ActiveMQ的client包括producer和consumer两类,从名字可看出,producer是产生消息的消息生产者,consumer是接收消息的消息消费者,
现实中producer和consumer是两个应用程,它们之间通过ActiveMQ进行通信
(3)destination:发送消息的目标,接收消息的来源。包括两类,queue和topic。queue中的消息只能一次性消费,topic中的消息可以被所有订阅者同时消费
(4)持久化:ActiveMQ支持持久化,可以将接收到的消息保存到数据库中,就算ActiveMQ重启,也照样能将尚未派发的消息发送出去。
(5)异步:ActiveMQ最大的特点就是异步,这也是和webservice最大的差别,发送者只需将消息发送给ActiveMQ,剩下的事交给ActiveMQ就行,发送者不用关心。
ActiveMQ的目标
是在尽可能多的平台和语言上提供一个标准的,消息驱动的应用集成。ActiveMQ实现JMS规范并在此之上提供大量额外的特性。
基本特性
(1)连接----ActiveMQ提供各种连接选择,包括HTTP,HTTPS,IP多点传送,SSL,STOMP,TCP,UDP,XMPP等
(2)可插拔的持久性和安全----ActiveMQ提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权
(3)用Java建立消息驱动应用----ActiveMQ最常用在Java应用中,用于发送和接收消息
(4)与应用服务器集成----ActiveMQ与java应用服务器集成是很常见的
(5)客户端APIs----ActiveMQ对多种语言提供客户端API,可以通过ActiveMQ提供的客户端API使用ActiveMQ的全部特性
(6)代理器集群(Broker clustering)----为了利于扩展,多个ActiveMQ broker能够联合工作
(7)高级代理器特性和客户端选项----ActiveMQ为代理器和客户端连接提供很多高级的特性
(8)简单的管理----ActiveMQ是为开发者设计的
消息的同步和异步接收
(1)消息的同步接收
是指客户端主动去接收消息,客户端可以采用MessageConsumer的receive()方法去接收下一个消息
(2)消息的异步接收
是指当消息到达时,ActiveMQ主动通知客户端。客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumer。
MessageListener只有一个必须实现的方法 onMessag(),它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法
MessageConsumer message = session.createConsumer(destination);
//消息的异步接收:客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumer
message.setMessageListener(new MessageListener(){
public void onMessage(Message msg) {
MapMessage message = (MapMessage)msg;
try {
System.out.println("--Consumer收到消息:"+new Date(message.getLong("count")));
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
ActiveMQ的通讯方式
(1)mq可以实现点对点
(2)订阅发布
(3)集群
为什么会需要消息队列(MQ)?
主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达mysql,
直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
代码演示
(1)消费者
package com.mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSConsumer {
public static void main(String[] args) {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "myQueue";
//ConnectionFactory 是连接工厂,负责创建Connection
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);
Connection connection = null;
try {
//创建连接
connection = connectionFactory.createConnection();
connection.start();
//创建会话 Connection 负责创建 Session
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。
Destination destination = session.createQueue(subject);
MessageConsumer message = session.createConsumer(destination);
//消息的异步接收:客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumer
message.setMessageListener(new MessageListener(){
public void onMessage(Message msg) {
MapMessage message = (MapMessage)msg;
try {
System.out.println("--Consumer收到消息:"+message.getObject("userName"));
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(30000);
session.close();
connection.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
(2)生产者
package com.mq;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSProducer {
public static void main(String[] args) {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "myQueue";
ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user,password,url);
try {
Connection connection = contectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
//MessageProducer是一个由Session创建的对象,用来向Destination发送消息
MessageProducer producer = session.createProducer(destination);
for(int i = 0;i<=20;i++){
MapMessage message = session.createMapMessage();
message.setObject("userName", "張三" +i);
Thread.sleep(1000);
producer.send(message);
System.out.println("--Producer发送消息"+i+":"+new Date());
}
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
参考文章:
1、 ActiveMQ使用线程池实现消息的生产与消费
2、ActiveMQ开发手册和JMS ActiveMQ
http://wenku.baidu.com/link?url=2FR10zhMvEYg5IG-f22QV5W8yuTIBcc5YWbYe5dpJip94og0D62VU05arkZnWpLjK9unsLoqJUiZFVDdlqAezvucC6kcHHnlQGfCx0zamlW
http://wenku.baidu.com/link?url=_vlxnxKa0IsBDCqSV8IHjH9wn14_iEuvO4IDJWs2rWRGunbnXA68H4iSLMY-k1UubAPEsK2tq7QuKGH8DClJxENdyitwmaIzKmP5gQdXMr7
3、ActiveMQ入门实例