第一步:
先在IDEA里创建一个maven工程
pom如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>com.xiong.rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<!--指定 jdk 编译版本--> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build> <dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
</project>
第二步:
创建生产者类
package com.xiong.rabbitmq.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发消息
*/
public class producer {
//队列名称
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂id 连接mq队列
factory.setHost("localhost");
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//获取连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 第一个参数:队列名称
* 第二个参数:消息是否持久化(默认时,消息都是存储在内存中的)
* 第三个参数:该队列是否只供一个消费者一个消费(true:允许多个消费者消费,false(默认) 一个消费者)
* 第四个参数:是否自动删除(true:是自动删除 )
* 第五个参数:其他参数(高级特性)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String msg="hello world";
/**
* 发送一个消息
* 第一个参数:发送到哪一个交换机
* 第二个参数:路由的KEY值 我这个是队列名称
* 第三个参数:其他参数信息
* 第四个参数:发送消息的消息体(需要二进制格式,不能直接发消息)
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("发送出去了+++++++++++++++++");
}
}
第三步:
创建消费者
package com.xiong.rabbitmq.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 接受消息(消费者)
*/
public class consumer {
//队列名称
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂id 连接mq队列
factory.setHost("localhost");
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//接受消息回调
DeliverCallback deliverCal = (consumerTag, msg) ->{
System.out.println("消息为"+new String(msg.getBody()));
};
//取消消费消息回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被终止");
};
/**
* 消费者消费消息
*第一个参数:在哪一个队列
* 第二参数:是否要自动应答(true:是 f:手动)
* 第三个参数:消费者未成功消费时的回调
* 第四个参数:消费者取消消费时的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCal,cancelCallback);
}
}
接下来我们来看看控制台的情况:
运行生产者后如图,可以看见 ready为1说明消息发送成功
运行消费者后如图,可以看见 ready为0说明这个消息已经被消费了