0
点赞
收藏
分享

微信扫一扫

RabbitMQ六种模式、消息应答、持久化、消息分发

代码演示

1:HelloWorld 简单模式(交换机都用默认--绑定RoutingKey--队列)

消费者通过监听器主动获取消息

创建maven项目,加入依赖

<!--指定jdk编译版本-->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<dependencies>
    <!--rabbitmq依赖客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!--操作文件流的一个依赖-->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

查看mq端口号 默认5672

 ps -ef |grep rabbit

创建生产者类,编写代码

package com.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//简单模式
public class Producer {
    //队列名
    public static final String queue_name = "hello";
    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //ip
        connectionFactory.setHost("192.168.6.100");
        //port  不写也会走默认
//        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化 默认消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费 是否进行共享 true可以多个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
         * 5.其他参数
         */
        channel.queueDeclare(queue_name,false,false,false,null);
		 String message="hello world";
        /**
         * 发送一个消息
         * 1.发送到那个交换机  ""默认
         * 2.路由的key是哪个
         * 3.其他的参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",queue_name,null,message.getBytes());
        System.out.println("消息发送完毕");
        channel.close();
        connection.close();
    }
}

消费者

package com.mq;

import com.rabbitmq.client.*;

//收消息  不管信道与连接
public class Consumer {
    //与生产者一样名称
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

2:Work Queues 工作模式

对比于简单模式出现多个消费者

 消息应答

自动应答:速度高效率快,会出现消息丢失

手动应答:队列中会保存消息,消费者处理完后才会应答

修改之前代码

//推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
            //。。。其他代码
            //参数:1.标识,2.是否批量
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
         //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答
         * 3.消费者未成功消费的回调
         */
        //手动应答  消息失败可重发
        boolean autoAsk = false;
        channel.basicConsume(QUEUE_NAME,autoAsk,deliverCallback,cancelCallback);
        //上面代码都执行完了,才能应答对垒   队列里才会删消息

当两个消费者时正常会轮询接受消息,一个消费者出现异常,则另一个消费者会接受他的消息(消息自动重新入队)

持久化

保障当RabbitMQ服务停掉以后消息生产者发送过来的消息不丢失

设置队列持久化、设置消息持久化

修改之前代码

/**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化 默认消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费 是否进行共享 true可以多个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
         * 5.其他参数
         */
        channel.queueDeclare(queue_name,true,false,false,null);
       
        /**
         * 发送一个消息
         * 1.发送到那个交换机  ""默认
         * 2.路由的key是哪个
         * 3.其他的参数信息   持久化MessageProperties.PERSISTENT_TEXT_PLAIN
         * 4.发送消息的消息体
         */
        channel.basicPublish("",queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

需要重新创建一个队列,否则报错,因为队列一旦创建,就不能修改是否持久化了

消息分发

公平分发:轮询不公平的原因可能会造成其中一个消费者消息积压

 //设置公平分发消息
        int prefetchCount = 1;  //默认为0
        channel.basicQos(prefetchCount);

预取值:prefetchCount 写其他数的时候信道会积压消息    自测取值应该是竞争关系

交换机

RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

类型:直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的

临时队列

每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有**随机名称的队列**,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

3.发布订阅者模式

群发,每个消费者都有自己的队列

代码:创建连接工具类

public class RabbitMqUtils {
    //得到一个连接的channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

生产者

public class EmitLog {
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //发消息 群发
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            channel.basicPublish("logs", "", null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息" + message);
        }
    }
}

消费者,在复制一份

public class ReceiveLogs01 {
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //临时队列
        String q1 = channel.queueDeclare().getQueue();
//        String q2 = channel.queueDeclare().getQueue();
        //交换机  名称、类型
        channel.exchangeDeclare("logs","fanout");
        //绑定 队列名、交换机名、连接名
        channel.queueBind(q1,"logs","");
//        channel.queueBind(q2,"logs","");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("控制台打印接收到的消息"+message);
        };
        channel.basicConsume(q1, true, deliverCallback, consumerTag -> { });
    }
}

先启动消费者,即可创建对列进行绑定,在启动生产者发送消息,即可全部收到  

前两种:无名交换机

订阅者模式:使用扇出交换机,群发

路由模式:直接交换机

主题模式:主题交换机

4.路由模式

多重绑定,通过绑定的名称可以设置给那个消费者发送消息

消费者1

//接受日志
public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "disk";
        //队列
        channel.queueDeclare(queueName, false, false, false, null);
        //绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
            System.out.println("错误日志已经接收");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

消费者2

//接受日志
public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "console";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

生产者

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //创建多个bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info","普通info信息");
            bindingKeyMap.put("warning","警告warning信息");
            bindingKeyMap.put("error","错误error信息");
            //debug没有消费这接收这个消息 所有就丢失了
            bindingKeyMap.put("debug","调试debug信息");
            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
}

5.主题模式

相比于路由模式可以用通配符进行匹配

星号可以代替一个单词

#可以替代零个或多个单词

 代码

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            /**
             * Q1-->绑定的是
             *      中间带orange带3个单词的字符串(*.orange.*)
             * Q2-->绑定的是
             *      最后一个单词是rabbit的3个单词(*.*.rabbit)
             *      第一个单词是lazy的多个单词(lazy.#)
             *
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");
            bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");
            bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");
            bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
            bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
            bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");

            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
}
public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //声明Q1队列与绑定关系
        String queueName="Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
public class ReceiveLogsTopic02 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //声明Q2队列与绑定关系
        String queueName="Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

举报

相关推荐

0 条评论