0
点赞
收藏
分享

微信扫一扫

RabbitMQ学习——生产者与消费者入门例子


文章目录

  • ​​生产者​​
  • ​​消费者​​
  • ​​项目代码​​

生产者

package com.learn.rabbitmqapi.message;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;

public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

//2. 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3. 通过connection创建一个Channel
Channel channel = connection.createChannel();

//4. 通过Channel发送数据
for (int i = 0; i < 10; i++) {
String message = "Hello" + i;
//exchange为"",则通过routingKey取寻找队列
channel.basicPublish("","testQueue",null,message.getBytes());
}

//5. 关闭连接
channel.close();
connection.close();

}
}

消费者

package com.learn.rabbitmqapi.message;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;

public static final String QUEUE_NAME = "testQueue";

public static void main(String[] args) throws Exception {
//1. 创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

//2. 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();

//3. 通过connection创建一个Channel
Channel channel = connection.createChannel();

//4. 声明(创建)一个队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);

//5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//6. 设置Channel
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

int num = 0;
//7. 获取消息
while (true) {
//nextDelivery 会阻塞直到有消息过来
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

String message = new String(delivery.getBody());
System.out.println("收到:" + message);
num++;
if (num == 8) {
break;
}
}

channel.close();
connection.close();

}
}

先启动消费者,消费者代码会新建一个队列,再启动生成者

项目代码

​​下载地址​​


举报

相关推荐

0 条评论