概要:
前一章讲解了消费者如何通过通配符来匹配目的地,以实现一个消费者同时接收多个目的地的消息。对于生产者来讲,可能存在下面的需求:
1. 同一条message可能要发送到多个Queue; 2. 同一条message同时发送到Queue和Topic;等 这时,我们可以使用composite Destination来解决。 官方文章:http://activemq.apache.org/composite-destinations.html
下面将介绍如何将message发送到多个Queue、以及将message同时发送到Queue和Topic。
环境:
- JmsMessageListener.java
- Sender.java
- applicationContext-compositeDestination.xml
方式1:同时向多个Queue中发送相同的消息
源文件和配置文件:
JmsMessageListener.java 异步接收消息
package com.ll.compositeDestination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class JmsMessageListener implements MessageListener { public void onMessage(Message message) { System.out.println("消息全部内容:" + message.toString()); try { System.out.println("消息主题:" + message.getJMSDestination().toString()); } catch (JMSException e1) { e1.printStackTrace(); } TextMessage tm = (TextMessage) message; try { System.out.println("消息体:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } System.out.println("------------------------------------"); }}
Sender.java
package com.ll.compositeDestination;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;public class Sender { public static void main(String[] args) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext( "applicationContext-compositeDestination.xml"); JmsTemplate template = (JmsTemplate) applicationContext .getBean("jmsTemplate"); Destination destination =(Destination) applicationContext .getBean("destinationProducer"); // Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); //发送消息 template.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session .createTextMessage("同时向三个Queue中发送相同的消息"); } }); System.out.println("同时向三个Queue中发送相同的消息-发送完成..."); }}
applicationContext-compositeDestination.xml
version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616">property> bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="FOO.*">constructor-arg> bean> <bean id="destinationProducer" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="FOO.A,FOO.B,FOO.C,FOO.D">constructor-arg> bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory">property> <property name="defaultDestination" ref="destinationProducer">property> <property name="receiveTimeout" value="600">property> bean> <bean id="jmsMessageListener" class="com.ll.compositeDestination.JmsMessageListener"> bean> <bean id="consumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="jmsMessageListener" /> bean>beans>
运行结果:
方式2:同时向Queue中发送相同的消息
环境、jar包和方式1 相同;
总共3个文件:
- JmsMessageListener.java
- Sender.java
- applicationContext-compositeDestination.xml
JmsMessageListener.java
package com.ll.compositeDestination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class JmsMessageListener implements MessageListener { public void onMessage(Message message) { System.out.println("消息全部内容:" + message.toString()); try { System.out.println("消息主题:" + message.getJMSDestination().toString()); } catch (JMSException e1) { e1.printStackTrace(); } TextMessage tm = (TextMessage) message; try { System.out.println("消息体:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } System.out.println("------------------------------------"); }}
Sender.java
package com.ll.compositeDestination;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;public class Sender { public static void main(String[] args) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext( "applicationContext-compositeDestination.xml"); JmsTemplate template = (JmsTemplate) applicationContext .getBean("jmsTemplate"); Destination destination = (Destination) applicationContext .getBean("destinationProducer"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // 发送消息 template.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage("同时向多个Queue、Topic中发送相同的消息"); } }); }}
applicationContext-compositeDestination.xml
version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616">property> bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="FOO.*">constructor-arg> bean> <bean id="destination2" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="NOTIFY.FOO.*">constructor-arg> bean> <bean id="destinationProducer" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="FOO.1,FOO.2,FOO.3,FOO.4,topic://NOTIFY.FOO.D,topic://NOTIFY.FOO.E,topic://NOTIFY.FOO.F">constructor-arg> bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory">property> <property name="defaultDestination" ref="destinationProducer">property> <property name="receiveTimeout" value="600">property> bean> <bean id="jmsMessageListener" class="com.ll.compositeDestination.JmsMessageListener"> bean> <bean id="consumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="jmsMessageListener" /> bean> <bean id="consumer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destination2" /> <property name="messageListener" ref="jmsMessageListener" /> bean>beans>
运行结果:
从上面两张图可以看出,发送到多个Queue中的消息全部被消费了,但是发送到多个Topic中的消息, 有且只有一个Topic的消息被消费了,其他的消息都没有被消费,为什么?? 个人理解: 因为是同时发送到多个Queue和Topic中的(注意是同时,相同时刻),而消费者采用异步接收方式, 当所有消息都到达时,onMessage函数处理不过来,最多只能处理一个。 之所以Queue全部被消费了,而Topic只有1个被消费,是因为没有立即被消费的Queue消息, 会一直保存在MQ服务器(Queue消息:生产者和消费者没有时间依赖性),而Topic消息:生产者和消费者有时间依赖性, 没有被及时消费掉的消息,就再也没有机会消费了。