0
点赞
收藏
分享

微信扫一扫

【ActiveMQ入门-11】ActiveMQ学习-compositeDestination

概要:

前一章讲解了消费者如何通过通配符来匹配目的地,以实现一个消费者同时接收多个目的地的消息。对于生产者来讲,可能存在下面的需求:

1. 同一条message可能要发送到多个Queue; 2. 同一条message同时发送到Queue和Topic;等 这时,我们可以使用composite Destination来解决。 官方文章:​​http://activemq.apache.org/composite-destinations.html​​   


下面将介绍如何将message发送到多个Queue、以及将message同时发送到Queue和Topic。


环境:

  1. JmsMessageListener.java
  2. Sender.java
  3. applicationContext-compositeDestination.xml

【ActiveMQ入门-11】ActiveMQ学习-compositeDestination_ActiveMQ【ActiveMQ入门-11】ActiveMQ学习-compositeDestination_ActiveMQ_02



方式1:同时向多个Queue中发送相同的消息

源文件和配置文件:

JmsMessageListener.java 异步接收消息

package com.ll.compositeDestination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class JmsMessageListener implements MessageListener { public void onMessage(Message message) {    System.out.println("消息全部内容:" + message.toString());         try {     System.out.println("消息主题:" + message.getJMSDestination().toString());   } catch (JMSException e1) {     e1.printStackTrace();   }       TextMessage tm = (TextMessage) message;     try {     System.out.println("消息体:" + tm.getText());    } catch (JMSException e) {      e.printStackTrace();    }     System.out.println("------------------------------------"); }}


Sender.java

package com.ll.compositeDestination;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;public class Sender { public static void main(String[] args) {    ApplicationContext applicationContext = new ClassPathXmlApplicationContext(       "applicationContext-compositeDestination.xml");   JmsTemplate template = (JmsTemplate) applicationContext       .getBean("jmsTemplate");         Destination destination =(Destination) applicationContext      .getBean("destinationProducer");    //    Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");       //发送消息    template.send(destination, new MessageCreator() {     public Message createMessage(Session session) throws JMSException {       return session            .createTextMessage("同时向三个Queue中发送相同的消息");     }   });   System.out.println("同时向三个Queue中发送相同的消息-发送完成..."); }}

applicationContext-compositeDestination.xml

 version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://www.springframework.org/schema/beans    http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">    <property name="brokerURL" value="tcp://localhost:61616">property>  bean>    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">    <constructor-arg index="0" value="FOO.*">constructor-arg>  bean>    <bean id="destinationProducer" class="org.apache.activemq.command.ActiveMQQueue">    <constructor-arg index="0" value="FOO.A,FOO.B,FOO.C,FOO.D">constructor-arg>  bean>  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">    <property name="connectionFactory" ref="connectionFactory">property>    <property name="defaultDestination" ref="destinationProducer">property>    <property name="receiveTimeout" value="600">property>  bean>    <bean id="jmsMessageListener" class="com.ll.compositeDestination.JmsMessageListener">  bean>    <bean id="consumer"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory" />    <property name="destination" ref="destination" />    <property name="messageListener" ref="jmsMessageListener" />  bean>beans>


运行结果:

【ActiveMQ入门-11】ActiveMQ学习-compositeDestination_java_03


方式2:同时向Queue中发送相同的消息

环境、jar包和方式1 相同;

总共3个文件:

  1. JmsMessageListener.java
  2. Sender.java
  3. applicationContext-compositeDestination.xml

JmsMessageListener.java

package com.ll.compositeDestination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class JmsMessageListener implements MessageListener { public void onMessage(Message message) {    System.out.println("消息全部内容:" + message.toString());         try {     System.out.println("消息主题:" + message.getJMSDestination().toString());   } catch (JMSException e1) {     e1.printStackTrace();   }       TextMessage tm = (TextMessage) message;     try {     System.out.println("消息体:" + tm.getText());    } catch (JMSException e) {      e.printStackTrace();    }     System.out.println("------------------------------------"); }}


Sender.java

package com.ll.compositeDestination;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.command.ActiveMQQueue;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;public class Sender { public static void main(String[] args) {    ApplicationContext applicationContext = new ClassPathXmlApplicationContext(       "applicationContext-compositeDestination.xml");   JmsTemplate template = (JmsTemplate) applicationContext       .getBean("jmsTemplate");    Destination destination = (Destination) applicationContext        .getBean("destinationProducer");        try {     Thread.sleep(3000);   } catch (InterruptedException e) {      e.printStackTrace();    }   // 发送消息   template.send(destination, new MessageCreator() {     public Message createMessage(Session session) throws JMSException {       return session.createTextMessage("同时向多个Queue、Topic中发送相同的消息");     }   }); }}


applicationContext-compositeDestination.xml

 version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://www.springframework.org/schema/beans    http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">    <property name="brokerURL" value="tcp://localhost:61616">property>  bean>    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">    <constructor-arg index="0" value="FOO.*">constructor-arg>  bean>    <bean id="destination2" class="org.apache.activemq.command.ActiveMQTopic">    <constructor-arg index="0" value="NOTIFY.FOO.*">constructor-arg>  bean>    <bean id="destinationProducer" class="org.apache.activemq.command.ActiveMQQueue">    <constructor-arg index="0"      value="FOO.1,FOO.2,FOO.3,FOO.4,topic://NOTIFY.FOO.D,topic://NOTIFY.FOO.E,topic://NOTIFY.FOO.F">constructor-arg>  bean>    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">    <property name="connectionFactory" ref="connectionFactory">property>    <property name="defaultDestination" ref="destinationProducer">property>    <property name="receiveTimeout" value="600">property>  bean>    <bean id="jmsMessageListener" class="com.ll.compositeDestination.JmsMessageListener">  bean>    <bean id="consumer"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory" />    <property name="destination" ref="destination" />    <property name="messageListener" ref="jmsMessageListener" />  bean>    <bean id="consumer2"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory" />    <property name="destination" ref="destination2" />    <property name="messageListener" ref="jmsMessageListener" />  bean>beans>


运行结果:

【ActiveMQ入门-11】ActiveMQ学习-compositeDestination_spring_04【ActiveMQ入门-11】ActiveMQ学习-compositeDestination_ActiveMQ+spring_05

从上面两张图可以看出,发送到多个Queue中的消息全部被消费了,但是发送到多个Topic中的消息, 有且只有一个Topic的消息被消费了,其他的消息都没有被消费,为什么?? 个人理解: 因为是同时发送到多个Queue和Topic中的(注意是同时,相同时刻),而消费者采用异步接收方式, 当所有消息都到达时,onMessage函数处理不过来,最多只能处理一个。 之所以Queue全部被消费了,而Topic只有1个被消费,是因为没有立即被消费的Queue消息, 会一直保存在MQ服务器(Queue消息:生产者和消费者没有时间依赖性),而Topic消息:生产者和消费者有时间依赖性, 没有被及时消费掉的消息,就再也没有机会消费了。


举报

相关推荐

0 条评论