linux上安装activeMQ [url]http://blog.163.com/yangzhanghui_job/blog/static/17957506220127171173225/[/url]
ActiveMQ-5.8.0 安装和启动 [url]http://chenzhou123520.iteye.com/blog/1915287[/url]
在CentOS Linux下部署Activemq 5 [url]http://sunbean.blog.51cto.com/972509/675529[/url]
运行环境:jdk1.6 ,javaEE5 , spring2.5 ,activeMQ5.4.3.
一定要注意activeMQ的版本与jdk的兼容性,[color=red]最新的activeMQ版本估计要在jdk1.7以上才能运行。[/color]
先说一下activeMQ的安装:
1、下载:[url]http://activemq.apache.org/download.html[/url] 选择合适的Windows版本
2、安装
(1) 首先配置JAVA环境变量
JAVA_HOME=D:\Program Files\Java\jdk1.5.0
CLASSPAHT=.;%JAVA_HOME%\lib
PATH=%JAVA_HOME%\bin;
(2)直接解压至任意目录(例如:D:\apache-activemq-5.3.0)
3、启动ActiveMQ服务器:直接运行\bin\win32\activemq.bat
当运行成功后,界面显示: Started SelectChannelConnector@0.0.0.0:8161 即说明activemq启动成功。
4、打开ActiveMQ消息管理后台系统 [url]http://localhost:8161/admin/[/url]
需要依赖的jar包有:[b]spring.jar , activemq-all-5.4.3.jar , commons-logging-api-1.1.jar , commons-io-1.3.2.jar[/b]
好了,准备工作做完后,开始上代码,
先看一下,我们最终的Spring配置文件applicationContext.xml的内容,如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
<!-- jms 连接工厂 -->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- 配置代理的地址,即配置activeMQ的连接URI,
让jms工厂能够连接到activeMQ服务器(将activeMQ暴露给客户端使用,
负责客户端与activeMQ之间的连接通信) -->
<property name="brokerURL">
<value>tcp://localhost:61616</value><!-- 一种标准URI地址,意思是说标识一个本地的端口号位61616的TCP连接(其中,"61616"是activeMQ默认的连接端口号) -->
</property>
</bean>
<!-- ActiveMQ连接器将这种简单等级结构的URI模式称为低等级的连接器(low-levelconnectors),
并为这些连接器实现了基本的网络通信协议。低等级连接器URIs使用主题(scheme)标识底层使用的网络协议,
使用路径元素定位网络资源服务(一般为主机名加上端口号),使用查询元素用来确定连接器附加信息。 -->
<!-- jms 连接池 -->
<!--
<bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
</bean>
-->
<!-- jms 模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
</bean>
<!-- jms Topic -->
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic"
autowire="constructor">
<constructor-arg value="STOCKS.JAVA" />
</bean>
<!-- jms Consumer -->
<bean id="javaConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="myTopic" />
<property name="messageListener" ref="myTextListener" />
</bean>
<!-- 消息监听器 -->
<bean id="myTextListener" class="demo.TextListener">
</bean>
<!-- 消息发布器 -->
<bean id="springPublisher" class="demo.SpringPublisher">
<property name="template">
<ref local="jmsTemplate" />
</property>
<property name="topic">
<ref local="myTopic" />
</property>
</bean>
</beans>
接下来,消息生成器代码,实现spring的MessageCreator接口:
package demo;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.core.MessageCreator;
public class MyMessageCreator implements MessageCreator {
/**
* 消息序号
*/
private int msgNo;
public MyMessageCreator(int no) {
this.msgNo = no;
}
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMsg = session.createTextMessage();
textMsg.setText(new Date() + "第" + this.msgNo + "条消息发出");
return textMsg;
}
}
接下来,消息监听器,实现javaEE的规范MessageListener接口即可,因为要注入到spring的DefaultMessageListenerContainer中。此监听器通过监听来自destination(在spring中配置)的消息,一旦有消息就打印出来:
package demo;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TextListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage msg = null;
try {
if (message instanceof TextMessage) {
msg = (TextMessage) message;
System.out.println("Reading message: " + msg.getText());
} else {
System.out.println("Message of wrong type: "
+ message.getClass().getName());
}
} catch (JMSException e) {
System.out.println("JMSException in onMessage(): " + e.toString());
} catch (Throwable t) {
System.out.println("Exception in onMessage():" + t.getMessage());
}
}
}
下面是消息发布器,通过spring的jms模板,即可轻松的获得与activeMQ的连接与通信,从而获得Connection和Destination,再通过JmsTemplate的send方法,即可发送消息到指定的destination(在spring中配置)中,以供客户端接收:
package demo;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
public class SpringPublisher {
/**
* Jms模板
*/
private JmsTemplate template;
/**
* Topic
*/
private Destination topic;
public JmsTemplate getTemplate() {
return template;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
public Destination getTopic() {
return topic;
}
public void setTopic(Destination topic) {
this.topic = topic;
}
/**
* Start
*
* @throws InterruptedException
*/
public void start() throws InterruptedException {
int messageCount = 10;
while ((--messageCount) > 0) {
sendMessage(messageCount);
Thread.sleep(1000);
}
}
/**
* 消息发送
*/
protected void sendMessage(int msgNO) {
this.template.send(this.topic, new MyMessageCreator(msgNO));
}
}
至此,基本的jms就已经搭建好了,很简单吧,一个spring上下文配置,一个消息生成器,一个消息发布器,一个监听器,搞定。接下来,编写一个测试类,看运行结果(注意在运行测试类前,一定要先启动activeMQ服务器):
package test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import demo.SpringPublisher;
public class SpringJmsTestMain {
/**
* @param args
*/
public static void main(String[] args) throws InterruptedException {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext.xml" });
SpringPublisher publisher = (SpringPublisher) context
.getBean("springPublisher");
publisher.start();
}
}
运行结果如下:
[color=gray]Reading message: Sun Jun 28 19:40:05 CST 2015第9条消息发出
Reading message: Sun Jun 28 19:40:06 CST 2015第8条消息发出
Reading message: Sun Jun 28 19:40:07 CST 2015第7条消息发出
Reading message: Sun Jun 28 19:40:08 CST 2015第6条消息发出
Reading message: Sun Jun 28 19:40:09 CST 2015第5条消息发出
Reading message: Sun Jun 28 19:40:10 CST 2015第4条消息发出
Reading message: Sun Jun 28 19:40:12 CST 2015第3条消息发出
Reading message: Sun Jun 28 19:40:13 CST 2015第2条消息发出
Reading message: Sun Jun 28 19:40:14 CST 2015第1条消息发出[/color]