1.导入依赖
<!--工具类-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<!--log4j-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
2.生产者的基本实现
package Producer;
import com.alibaba.fastjson.JSON;
import entity.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class MyProducer {
private final static String TOPIC_NAME = "my-one-topic";
public static void main(String[] args) {
Properties props = new Properties();
//设置kafka集群ip
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"123.60.111.191:9091");
/**
* 发出消息持久化参数
* acks = 0:表示Producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息,性能高。容易丢消息
* acks = 1:至少要等待leader已经成功的将数据写入本地log.但是不需要等待素有的follower是否成功写入,就可以继续发送下一条消息
* 这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
* acks = all或者-1:需要等待 min_insync.replicas(默认为1,推荐配置大于2)这个参数配置的副本个数都成功写入日志,这种策略
* 会保证只要有一个备份存活就不会丢失数据,这是最强的数据保证
*/
props.put(ProducerConfig.ACKS_CONFIG,"1");
/**
* 发送消息失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息的重复发送,比如网络抖动,所以
* 需要再接收者那边做好消息接收的幂等性
*/
props.put(ProducerConfig.RETRIES_CONFIG,"3");
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,3);
//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值为33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
/**
* kafka本地线程会从缓冲区取数据,批量发送到broker
* 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch慢了16kb就发送出去
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
/**
* 默认值是0,意思就是说消息必须立即被发送,但这样会影响性能
* 一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起发送出去
* 如果10毫秒,batch没满,那么也必须将消息发送出去,不能让消息的发送延迟时间太长
*/
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送的value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//创建发消息的客户端
Producer<String,String> producer = new KafkaProducer<String, String>(props);
int msgNum = 5;
//CountDownLatch 门栓
final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < 1000L; i++) {
Order order = new Order((long) i,i);
//指定发送分区
/*ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,
0,order.getOrderId().toString(), JSON.toJSONString(order));*/
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,
order.getOrderId().toString(), JSON.toJSONString(order));
/* try {
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
//阻塞
System.out.println("同步方式发送消息结果:"+"topic-"+metadata.topic()+"|partition-"+metadata.partition()
+"|offset-"+metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
//1.记录日志 预警系统
//2.设置时间间隔1s再次发送 如果还不行,日志预警,人工介入
try {
RecordMetadata metadata = producer.send(producerRecord).get();
} catch (Exception e1) {
e.printStackTrace();
//人工介入
}
} catch (ExecutionException e) {
e.printStackTrace();
}*/
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
System.out.println("发送消息失败:"+e.getStackTrace());
}
if(recordMetadata !=null){
System.out.println("同步方式发送消息结果:"+"topic-"+recordMetadata.topic()+"|partition-"+recordMetadata.partition()
+"|offset-"+recordMetadata.offset());
}
countDownLatch.countDown();
}
});
}
}
}
3.消费者基本实现
package Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者
*/
public class MyConsumer {
private final static String TOPIC_NAME = "my-one-topic";
//消费组
private final static String CONSUMER_GROUP_NAME = "group-lzl";
public static void main(String[] args) {
Properties props = new Properties();
//设置broker集群ip
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"123.60.111.191:9091");
//是否自动offset,默认为true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自动offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//创建一个消费者的客户端
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
//消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d,key = %s ,value = %s%n",
record.partition(),record.offset(),record.key(),record.value());
}
}
}
}