0
点赞
收藏
分享

微信扫一扫

08-RabbitMQ核心API-Topic Exchange

登高且赋 2022-10-08 阅读 206

Topic Exchange

简介

  • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配, 此时队列需要绑定一个Topic
  • 可以使用通配符进行模糊匹配
  • #: 匹配一个或多个词
  • *: 匹配不多不少一个词
  • log.# : 能够匹配 log.info.oa
  • log.* : 只会匹配 log.error

08-RabbitMQ核心API-Topic Exchange_回调函数

代码实现

消费者1

package com.dance.redis.mq.rabbit.topic;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Receiver4TopicExchange1 {

public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_topic_exchange";
String queueName = "test_topic_queue";
// String routingKey = "user.*";
String routingKey = "user.#";
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
RabbitMQHelper.queueDeclare(channel, queueName);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.err.println("consumer1 start.. ");
System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();

}
}

消费者2

package com.dance.redis.mq.rabbit.topic;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Receiver4TopicExchange2 {

public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.*";
// String routingKey = "user.#";
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
RabbitMQHelper.queueDeclare(channel,queueName);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.err.println("consumer2 start.. ");
System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}

生产者

package com.dance.redis.mq.rabbit.topic;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Sender4TopicExchange {


public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
RabbitMQHelper.closeConnection();
}

}

测试

测试#规则

08-RabbitMQ核心API-Topic Exchange_java_02

确定目前是木有binding规则的, 如果没有启动过, 就不用看

启动消费者1

08-RabbitMQ核心API-Topic Exchange_回调函数_03

启动生产者

08-RabbitMQ核心API-Topic Exchange_java_04

查看消费者1

08-RabbitMQ核心API-Topic Exchange_redis_05

三条消息全部被消费, 可以看到#是可以匹配多个词的

测试*规则

08-RabbitMQ核心API-Topic Exchange_回调函数_06

先去把之前的规则解绑

恢复到什么都没有的状态

启动消费者2

08-RabbitMQ核心API-Topic Exchange_回调函数_07

启动生产者

08-RabbitMQ核心API-Topic Exchange_java_08

查看消费者2

08-RabbitMQ核心API-Topic Exchange_java_09

可以看到3条消息只有2条消息被消费了, 所以*是只能匹配一个词的, 多个词的没有匹配到



举报

相关推荐

0 条评论