0
点赞
收藏
分享

微信扫一扫

RabbitMq、SpringBoot详解

RabbitMq、SpringBoot详解

一、RabbitMq

pom.xml配置

 <dependencies>
      <!--web整合 -->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

      <!--导入配置文件处理器 配置文件进行绑定就会有提示-->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-configuration-processor</artifactId>
      </dependency>

      <!--spring boot 的单元测试 -->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
      </dependency>

      <!--mybatis-->
      <dependency>
          <groupId>org.mybatis.spring.boot</groupId>
          <artifactId>mybatis-spring-boot-starter</artifactId>
          <version>2.1.0</version>
      </dependency>

      <!--mysql -->
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
      </dependency>
      <!--lombok-->
      <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <scope>provided</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.51</version>
      </dependency>

      <!--创建maven工程导入依赖-->
      <dependency>
          <groupId>org.mybatis.generator</groupId>
          <artifactId>mybatis-generator-core</artifactId>
          <version>1.4.0</version>
      </dependency>

      <!--rabbitmq-->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
  </dependencies>

1、入门案列

生产者

import com.rabbitmq.client.*;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
	     ConnectionFactory factory = new ConnectionFactory();
	     factory.setHost("127.0.0.1");
	     factory.setUsername("admin");
	     factory.setPassword("admin");
	     //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
	     Connection connection = factory.newConnection();
	     Channel channel = connection.createChannel();
	
	     /**
	      * 生成一个队列
	      * 1.队列名称
	      * 2.队列里面的消息是否持久化 默认消息存储在内存中
	      * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
	      * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
	      * 5.其他参数
	      */
	     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	     String message = "hello world";
	     /**
	      * 发送一个消息
	      * 1.发送到那个交换机
	      * 2.路由的 key 是哪个
	      * 3.其他的参数信息
	      * 4.发送消息的消息体
	      */
	     channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
	     System.out.println("消息发送完毕");
    }
}

消费者

import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        {
            System.out.println(consumerTag);
            String message = new String(delivery.getBody());
            System.out.println(message);
        };

        CancelCallback cancelCallback = (consumerTag) ->
        {
            System.out.println("消息消费被中断");
        };

        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消 消费 的回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}

2、消息应答

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。自动应答性能高数据存在丢失情况。手动应答性能低数据不丢失。 企业中一半都选用手动应答。上面入门案例(默认)就是自动应答。

手动应答

生产者

import cn.yx.zg.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

/**
 * 消息手动应答
 */
public class Task02 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {

            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("发送消息完成:" + message);
            }
        }
    }
}

消费者1

import cn.yx.zg.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker03 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("任务三:接收到消息:" + receivedMessage);
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费.................. ");

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

消费者2

import cn.yx.zg.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker04 {
    private static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("任务四:接收到消息:"+receivedMessage);
            if (receivedMessage.equals("1")){
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        CancelCallback cancelCallback=(consumerTag)-> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};
        System.out.println("C2 消费者启动等待消费.................. ");

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

二、SpringBoot整合RabbitMq

举报

相关推荐

0 条评论