1. 简单介绍
kafka拦截器用于生产者和消费者对统一对消息做处理。且可以设置多个拦截器,用于链式调用。
生产者拦截器可以用于生产消息前做处理,消费者可以用于消费消息前做处理。
2. 简单使用
1. 生产者
package cn.qz.cloud.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
/**
* 发送消息回调
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
/**
* 发送完成回调
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// 统计成功和失败的次数
if (e == null) {
successCounter++;
} else {
errorCounter++;
}
}
/**
* 关闭方法
*/
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
/**
* 配置完成方法
*/
@Override
public void configure(Map<String, ?> map) {
System.out.println("111222");
}
}
===
package cn.qz.cloud.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
对生产者设置拦截器
// 2. 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.qz.cloud.kafka.interceptor.TimeInterceptor");
interceptors.add("cn.qz.cloud.kafka.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
2. 消费者
消费者实现另一个接口,设置方法同上:
package cn.qz.cloud.kafka.interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Map;
public class MyConsumerInterceptor implements ConsumerInterceptor {
@Override
public ConsumerRecords onConsume(ConsumerRecords records) {
System.out.println("MyConsumerInterceptor===1===" + records.count());
return records;
}
@Override
public void close() {
// System.out.println("MyConsumerInterceptor===2");
}
@Override
public void onCommit(Map offsets) {
// System.out.println("MyConsumerInterceptor===3");
}
@Override
public void configure(Map<String, ?> configs) {
// System.out.println("MyConsumerInterceptor===4");
}
}
设置拦截器
// 2. 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.qz.cloud.kafka.interceptor.MyConsumerInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors)
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】