0
点赞
收藏
分享

微信扫一扫

kafka拦截器

infgrad 2023-02-05 阅读 71

1. 简单介绍

  kafka拦截器用于生产者和消费者对统一对消息做处理。且可以设置多个拦截器,用于链式调用。

  生产者拦截器可以用于生产消息前做处理,消费者可以用于消费消息前做处理。

2. 简单使用

1. 生产者

package cn.qz.cloud.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<String, String> {

private int errorCounter = 0;

private int successCounter = 0;


/**
* 发送消息回调
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}

/**
* 发送完成回调
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// 统计成功和失败的次数
if (e == null) {
successCounter++;
} else {
errorCounter++;
}
}

/**
* 关闭方法
*/
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}

/**
* 配置完成方法
*/
@Override
public void configure(Map<String, ?> map) {
System.out.println("111222");
}
}

===
package cn.qz.cloud.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

对生产者设置拦截器

// 2. 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.qz.cloud.kafka.interceptor.TimeInterceptor");
interceptors.add("cn.qz.cloud.kafka.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

2. 消费者

消费者实现另一个接口,设置方法同上:

package cn.qz.cloud.kafka.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor {
@Override
public ConsumerRecords onConsume(ConsumerRecords records) {
System.out.println("MyConsumerInterceptor===1===" + records.count());
return records;
}

@Override
public void close() {
// System.out.println("MyConsumerInterceptor===2");
}

@Override
public void onCommit(Map offsets) {
// System.out.println("MyConsumerInterceptor===3");
}

@Override
public void configure(Map<String, ?> configs) {
// System.out.println("MyConsumerInterceptor===4");
}
}

设置拦截器

// 2. 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.qz.cloud.kafka.interceptor.MyConsumerInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors)

 

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】



举报

相关推荐

0 条评论