0
点赞
收藏
分享

微信扫一扫

RabbitMqAck--消息确认模式


消息确认模式

  • ​​1.生产者的消息确认​​
  • ​​1.1默认模式​​
  • ​​1.2事务模式​​
  • ​​1.3消息确认模式​​
  • ​​1.4总结​​
  • ​​1.5消息确认模式的确认方式​​
  • ​​2.消费者应答​​
  • ​​2.1自动应答​​
  • ​​2.2手动应答​​
  • ​​3.消费者拒绝接受消息​​
  • ​​4.消息预取​​
  • ​​5.例子​​
  • ​​5.1创建项目​​
  • ​​5.2添加依赖​​
  • ​​5.3创建配置类​​
  • ​​5.4创建模式枚举类​​
  • ​​5.5创建生产者​​
  • ​​5.6创建消费者​​
  • ​​5.7创建消息拒绝的消费者​​
  • ​​5.8生产消息​​
  • ​​5.9消息消费​​
  • ​​5.10拒绝消息的消费者​​

1.生产者的消息确认

  • 事务模式
  • 确认模式
  • 默认模式

1.1默认模式

默认不进行确认,没有任何操作,当消息生产者发送给MQ就完成了一次消息生产。并不管如果MQ发生异常导致消息丢失的问题。

1.2事务模式

事务模式类似数据库中的事务,但是这里的事务是基于信道的,在信道内提交事务,此时才会把消息传送给MQ,如果因为业务上某个消息生产失败,那么可以使用信道的事务的回滚操作,撤回上次提交之前生产的消息。

1.3消息确认模式

消息确认模式采用回调实现的。所以比起事务模式,大大的降低了生产消息的消费,而且还能保证消息的安全性。

1.4总结

默认模式不安全,存在消息丢失的情况,但是消耗最低,速度最快。
事务模式比默认模式安全,不存在消息丢失的情况,但是生产消息的消耗较大,且客户端需要阻塞等待MQ的返回,导致消息生产的吞吐量不高,效率较低。
消息确认模式最为灵活,采用异步回调的方式,不会造成阻塞。而且对于消息来说又是安全的,所以也是使用最多的模式。
其效率远远大于事务模式。

1.5消息确认模式的确认方式

  • 普通确认:waitForConfirms
  • 批量确认:waitForConfirms
  • 异步确认:addConfirmListener

2.消费者应答

消费者应答的两种方式:

  • 自动应答
  • 手动应答

2.1自动应答

自动应答模式下,MQ发送给消费者,然后就会从队列中删除消息。
自动应答模式只会保证MQ到消费者,却无法保证消费者消费异常的情况。

2.2手动应答

手动应答模式下,MQ发送消息给消费者,然后此消息还会在队列中存储,只有收到消费者发送的应答结果,然后才会从队列中删除消息。
这种方式能够保证消息在消费者正确的消费,如果消费者消费出现异常,可以不进行应答,然后MQ会将消息发送给其他的消费者。
注意:如果一个队列只有一个消费者,这个消费者没有进行手动的应答,那么可能会造成死循环导致内存溢出。

3.消费者拒绝接受消息

当消费者无法消费消息时,可以拒绝MQ投递的消息。
basicReject(long deliveryTag, boolean requeue)throws IOException
拒绝一条消息:
deliveryTag消息唯一标识;
requeue是否重传(false为丢弃)
basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException
拒绝多个消息:
deliveryTag拒绝消息的边界
multiple:为true时拒绝所有deliveryTab小于当前消息的所有未确认消息;
requeue:被拒绝的消息是否重传(false为丢弃)

4.消息预取

可以设置信道一次可以传输的消息数量。
basicQos方法。

5.例子

5.1创建项目

RabbitMqAck--消息确认模式_RabbitMQ的消费者应答

5.2添加依赖

RabbitMqAck--消息确认模式_RabbitMQ的消费者应答_02

dependencies {
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.7.1'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.26'
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26'
testCompile group: 'junit', name: 'junit', version: '4.12'
}

5.3创建配置类

一个普通的java类,定义了一些配置参数:

package com.study.config;

/**
* @author jiayq
*/
public class RabbitMqConfig {

/**
* 队列名称
*/
public static final String QUEUE_NAME = "HelloQueue";

/**
* 主机名称
*/
public static final String HOST_NAME = "localhost";

/**
* 路由键
*/
public static final String ROUNT_KEY = "HelloRounts";

/**
* 交换机
*/
public static final String EXCHANGE_NAME = "HelloExchange";

}

5.4创建模式枚举类

package com.study.neum;

/**
* @author jiayq
*/

public enum RabbitMqAckModel {

/**
* 普通确认模式
*/
COMMON(0,"普通确认模式"),


/**
* 批量确认模式
*/
BATCHACK(1,"批量确认模式"),

/**
* 异步确认模式
*/
ASYNACK(2,"异步确认模式");

/**
* id
*/
private int id;

/**
* name
*/
private String name;

private RabbitMqAckModel(int id, String name) {
this.id = id;
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}
}

package com.study.neum;

/**
* @author jiayq
*/

public enum RabbitMqModel {

/**
* 默认模式
*/
NONE(0,"默认模式"),

/**
* 事务模式
*/
TX(1,"事务模式"),

/**
* 消息确认模式
*/
ACK(2,"确认模式");

/**
* id
*/
private int id;

/**
* name
*/
private String name;

private RabbitMqModel(int id, String name){
this.id = id;
this.name = name;
}

/**
* @return
*/
public int getId() {
return id;
}

/**
* @param id
*/
public void setId(int id) {
this.id = id;
}

/**
* @return
*/
public String getName() {
return name;
}

/**
* @param name
*/
public void setName(String name) {
this.name = name;
}
}

5.5创建生产者

package com.study.produce;

import com.rabbitmq.client.*;
import com.study.config.RabbitMqConfig;
import com.study.neum.RabbitMqAckModel;
import com.study.neum.RabbitMqModel;

import java.util.Scanner;

/**
* @author jiayq
*/
public class Produce {

private static RabbitMqModel rabbitMqModel = RabbitMqModel.ACK;

private static RabbitMqAckModel rabbitMqAckModel = RabbitMqAckModel.ASYNACK;

public static void main(String[] args) {
System.out.println("produce model is " + rabbitMqModel.getName());
if (RabbitMqModel.ACK.equals(rabbitMqModel)) {
System.out.println("produce ack model is " + rabbitMqAckModel.getName());
}
try (Scanner scanner = new Scanner(System.in)) {
while (true) {
System.out.println("Please input message(one row one message,quit to exits):");
String message = scanner.nextLine();
if ("quit".equals(message)) {
System.out.println("Will be quit!!!");
break;
}
sendMessage(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}

private static void sendMessage(String message) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMqConfig.HOST_NAME);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, "direct", true, false, null);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNT_KEY);
switch (rabbitMqModel) {
case NONE:
break;
case TX:
channel.txSelect();
break;
case ACK:
channel.confirmSelect();
break;
default:
break;
}
channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNT_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
switch (rabbitMqModel) {
case NONE:
break;
case TX:
channel.txCommit();
break;
case ACK:
switch (rabbitMqAckModel) {
case COMMON:
channel.waitForConfirms();
break;
case BATCHACK:
channel.waitForConfirms();
break;
case ASYNACK:
ConfirmCallback confirmCallback = (deliverTag, multiple) ->{
System.out.println("deliverTag:\t" + deliverTag);
System.out.println("multiple:\t" + multiple);
};
channel.addConfirmListener(confirmCallback, confirmCallback);
break;
default:
break;
}
break;
default:
break;
}
}

}

5.6创建消费者

package com.study.consume;

import com.rabbitmq.client.*;
import com.study.config.RabbitMqConfig;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author jiayq
*/
public class Consume {

/**
* 消费者应答模式
*/
private static boolean consumeAck = false ;

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMqConfig.HOST_NAME);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, "direct", true,false,null);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNT_KEY);
DeliverCallback deliverCallback;
CancelCallback cancelCallback;
if (consumeAck) {
deliverCallback = (tag,message) -> {
System.out.println("consumeAck:\t" + consumeAck);
System.out.println("message:\t" + new String(message.getBody(),"utf-8"));
};
cancelCallback = (tag) -> {
System.out.println("consume error,tag:\t" + tag);
};
} else {
deliverCallback = (tag, message) -> {
System.out.println("consumeAck:\t" + consumeAck);
System.out.println("message:\t" + new String(message.getBody(),"utf-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
cancelCallback = (tag) -> {
System.out.println("consumer error,tag:\t" + tag);
};
}
channel.basicConsume(RabbitMqConfig.QUEUE_NAME, consumeAck, deliverCallback, cancelCallback);
}

}

5.7创建消息拒绝的消费者

package com.study.consume;

import com.rabbitmq.client.*;
import com.study.config.RabbitMqConfig;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author jiayq
*/
public class LazyConsume {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMqConfig.HOST_NAME);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, "direct", true, false,null);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNT_KEY);
channel.basicQos(1);
DeliverCallback deliverCallback = (tag, message) -> {
System.out.println("get Message:\t" + new String(message.getBody(),"utf-8"));
channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
};
CancelCallback cancelCallback = (tag) -> {
System.out.println("consume error,tag:\t" + tag);
};
channel.basicConsume(RabbitMqConfig.QUEUE_NAME, false, deliverCallback, cancelCallback);
}

}

5.8生产消息

默认模式

RabbitMqAck--消息确认模式_RabbitMQ的消息确认模式_03


事务模式

RabbitMqAck--消息确认模式_如何保证RabbitMQ的消息安全性_04


确认模式–普通确认模式

RabbitMqAck--消息确认模式_RabbmitMQ的生产者确认_05


确认模式–批量确认模式

RabbitMqAck--消息确认模式_如何保证RabbitMQ的消息安全性_06


确认模式–异步确认模式

RabbitMqAck--消息确认模式_RabbitMQ的消费者应答_07

5.9消息消费

自动应答模式

RabbitMqAck--消息确认模式_RabbitMQ的消息确认模式_08


手动应答模式

RabbitMqAck--消息确认模式_RabbitMQ的消息确认模式_09

5.10拒绝消息的消费者

RabbitMqAck--消息确认模式_RabbitMQ的消息确认模式_10


生产一个消息

RabbitMqAck--消息确认模式_RabbitMQ的消息确认模式_11


此时生产者和拒绝消息的消费者同时启动

RabbitMqAck--消息确认模式_RabbmitMQ的生产者确认_12


生产了一个消息

RabbitMqAck--消息确认模式_如何保证RabbitMQ的消息安全性_13


拒绝消息的消费者受到了消息,但是拒绝处理,并要求重传。

RabbitMqAck--消息确认模式_RabbitMQ的消息确认模式_14


在启动一个消费者

RabbitMqAck--消息确认模式_RabbitMQ是如何保证消息的安全_15


被正常消费

RabbitMqAck--消息确认模式_如何保证RabbitMQ的消息安全性_16

git仓库:
​​​https://github.com/a18792721831/MQ.git​​


举报

相关推荐

0 条评论