0
点赞
收藏
分享

微信扫一扫

[Apache Kafka 3.2源码解析系列]-3-从一个Kafka的Demo说起


目录

  • ​​3-从一个Kafka的Demo说起​​
  • ​​3.1 简介​​
  • ​​3.2 Demo编写​​
  • ​​3.2.1 引入依赖配置日志​​
  • ​​3.2.2 生产者例子​​
  • ​​3.2.3 消费者例子​​
  • ​​3.2.4 观察节点与topic信息​​


3-从一个Kafka的Demo说起

3.1 简介

为了在理论中可以更好的理解一些细节,我们通过一个Demo开始来详细看kafka的实现原理,首先我们要做的是本地启动一个kafka,关于启动kakfa可以看前面这个文章: ​​《1-Kaka知识点全解析》​​

启动完kafka之后我们就来编写一个生产者的示例代码,关于生产者的Demo来源于,kafka官方源码中的example模块,不过这里稍加改造,方便理解。

这里先贴下生产者的Demo项目目录:

[Apache Kafka 3.2源码解析系列]-3-从一个Kafka的Demo说起_kafka

3.2 Demo编写

3.2.1 引入依赖配置日志

首先引入依赖如下所示:


<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>3.2.0version>
dependency>

<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.25version>
dependency>

然后我们来编写Demo源码,这里客户端都以Java为例子:

为了打印方便我们将使用log4j的slf4j的日志实现将日志打印到控制台配置log4j.properties如下:

###set log levels###
log4j.rootLogger=info, stdout
###output to the console###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n

3.2.2 生产者例子

最后开始编写生产者Demo代码如下所示:

package link.elastic.kafka.producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class ProduceDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost" + ":" + "9092");
props.put("client.id", "DemoProducer");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("transaction.timeout.ms", -1);
props.put("enable.idempotence", false);

KafkaProducer producer = new KafkaProducer<>(props);

int messageKey = 0;
int recordsSent = 0;
int numRecords = 1000;
boolean isAsync = false;
String topic = "topic1";
while (recordsSent < numRecords) {
String messageStr = "Message_" + messageKey;
long startTime = System.currentTimeMillis();

// Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr)).get();
System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
messageKey += 2;
recordsSent += 1;
}
System.out.println("Producer sent " + numRecords + " records successfully");
}

}

3.2.3 消费者例子

关于消费者的Demo代码如下:

package link.elastic.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "8.131.79.126" + ":" + "9092");
String groupId = "DemoConsumer";
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
boolean readCommitted = false;
if (readCommitted) {
props.put("isolation.level", "read_committed");
}
props.put("auto.offset.reset", "earliest");

KafkaConsumer consumer = new KafkaConsumer<>(props);
String topic = "topic1";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
long count = 0;
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println(groupId + " received message : from partition " + record.partition()
+ ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
count++;
}
System.out.println(groupId + " finished reading " + count + " messages");
}
}
}

3.2.4 观察节点与topic信息

点击运行按钮接下来我们观察Zookeeper上的节点就可以看到了当前节点的分区信息和节点信息
[Apache Kafka 3.2源码解析系列]-3-从一个Kafka的Demo说起_kafka_02

也可以在kafka manager控制台看到topic1的存在

[Apache Kafka 3.2源码解析系列]-3-从一个Kafka的Demo说起_apache_03

查看原文,技术咨询支持,可以扫描微信公众号进行回复咨询
[Apache Kafka 3.2源码解析系列]-3-从一个Kafka的Demo说起_java_04

举报

相关推荐

0 条评论