0
点赞
收藏
分享

微信扫一扫

MessageQueue --- RabbitMQ

624c95384278 2024-02-21 阅读 8
rabbitmq

MessageQueue --- RabbitMQ

RabbitMQ Intro

RabbitMQ 核心概念

在这里插入图片描述
消息的路由过程如下:

名词解释:

RabbitMQ 分发类型

Direct Exchange

在这里插入图片描述

Topic Exchange

在这里插入图片描述

Fanout exchange

在这里插入图片描述

Header exchange

在这里插入图片描述

配置RabbitMQ 示例代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class DirectExchangeExample {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String QUEUE_NAME = "my_queue";
    private static final String ROUTING_KEY = "info";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个 Direct Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // 声明一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 绑定队列到 Direct Exchange,并指定绑定键
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            // 定义消息处理函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Received message: " + message);
            };

            // 消费消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

            System.out.println("Press any key to exit.");
            System.in.read();
        }
    }
}

Dead letter (死信)

在这里插入图片描述

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class DeadLetterExample {
    private static final String EXCHANGE_NAME = "normal_exchange";
    private static final String QUEUE_NAME = "normal_queue";
    private static final String DLX_EXCHANGE_NAME = "dlx_exchange";
    private static final String DLX_QUEUE_NAME = "dlx_queue";
    private static final String DLX_ROUTING_KEY = "dlx_routing_key";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 创建普通交换机和队列
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            // 创建死信交换机和队列
            channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
            channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, DLX_ROUTING_KEY);

            // 设置普通队列的死信参数
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
            arguments.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
            channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

            // 定义消息处理函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Received message: " + message);
                // 手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };

            // 消费消息
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

            System.out.println("Press any key to exit.");
            System.in.read();
        }
    }
}

保证消息的可靠传递

要确保消息的可靠传递,可以采取以下几个步骤:

发布者确认机制Example

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReliableMessagingExample {
    private static final String QUEUE_NAME = "my_queue";
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String ROUTING_KEY = "my_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接和信道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列和交换机
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // 启用发布者确认模式
        channel.confirmSelect();

        // 添加发布者确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Message confirmed, delivery tag: " + deliveryTag);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Message not confirmed, delivery tag: " + deliveryTag);
                // 可以在这里进行相应的处理,例如重新发送消息
            }
        });

        // 发布消息
        String message = "Hello, RabbitMQ!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        try {
            // 等待发布者确认
            channel.waitForConfirmsOrDie();
        } catch (InterruptedException e) {
            // 可以在这里进行相应的处理,例如重新发送消息
            e.printStackTrace();
        }

        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}

事务保证消息可靠性Example

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReliableMessagingExample {
    private static final String QUEUE_NAME = "my_queue";
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String ROUTING_KEY = "my_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接和信道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        try {
            // 开启事务
            channel.txSelect();

            // 声明队列和交换机
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            // 发布消息
            String message = "Hello, RabbitMQ!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

            // 提交事务
            channel.txCommit();
            System.out.println("Transaction committed successfully.");
        } catch (IOException e) {
            // 发生异常,回滚事务
            channel.txRollback();
            System.out.println("Transaction rolled back due to an exception.");
            e.printStackTrace();
        } finally {
            // 关闭信道和连接
            channel.close();
            connection.close();
        }
    }
}
举报

相关推荐

0 条评论