Topic消费模型
 
*  通配符模型
*      生产者必须指定完整且准确的路由key
*      消费者可以使用通配符
*          *:可以替代一级的任意字符 add.*     ==>     add.user                add.goods
*          #:可以替代多级的任意字符 add.#     ==>     add.user.name           add.user.name.firstName
 
生产者
 
package com.example.demo02.mq.topic;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class TopicSender {
    public static void main(String[] args) throws Exception {
        
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
        
        String msg1 = "商品新增了,Topic模型,routing key 为 goods.add";
        String msg2 = "商品修改了,Topic模型,routing key 为 goods.update";
        String msg3 = "商品删除了,Topic模型,routing key 为 goods.delete";
        String msg4 = "用户新增了,Topic模型,routing key 为 user.add";
        String msg5 = "用户修改了,Topic模型,routing key 为 user.update";
        String msg6 = "用户删除了,Topic模型,routing key 为 user.delete";
        String msg7 = "添加了用户名字,Topic模型,routing key 为 user.add.name";
        String msg8 = "添加了用户年龄,Topic模型,routing key 为 user.add.age";
        String msg9 = "修改了用户名字,Topic模型,routing key 为 user.update.name";
        String msg10 = "修改了用户年龄,Topic模型,routing key 为 user.update.age";
        channel.basicPublish("topic.exchange","goods.add",null,msg1.getBytes());
        channel.basicPublish("topic.exchange","goods.update",null,msg2.getBytes());
        channel.basicPublish("topic.exchange","goods.delete",null,msg3.getBytes());
        channel.basicPublish("topic.exchange","user.add",null,msg4.getBytes());
        channel.basicPublish("topic.exchange","user.update",null,msg5.getBytes());
        channel.basicPublish("topic.exchange","user.delete",null,msg6.getBytes());
        channel.basicPublish("topic.exchange","user.add.name",null,msg7.getBytes());
        channel.basicPublish("topic.exchange","user.add.age",null,msg8.getBytes());
        channel.basicPublish("topic.exchange","user.update.name",null,msg9.getBytes());
        channel.basicPublish("topic.exchange","user.update.age",null,msg10.getBytes());
        
        channel.close();
        connection.close();
    }
}
 
消费者1
 
package com.example.demo02.mq.topic;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicReceiver1 {
    public static void main(String[] args) throws Exception {
        
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
        
        channel.queueDeclare("topic.queue1", false, false, false, null);
        
        channel.queueBind("topic.queue1", "topic.exchange", "goods.*");
        
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("商品模块接收到的消息是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("topic.queue1",false,consumer);
    }
}
 
消费者2
 
package com.example.demo02.mq.topic;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicReceiver2 {
    public static void main(String[] args) throws Exception {
        
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
        
        channel.queueDeclare("topic.queue2", false, false, false, null);
        
        channel.queueBind("topic.queue2", "topic.exchange", "user.#");
        
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("用户模块接收到的消息是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("topic.queue2",false,consumer);
    }
}
 
结果
 
