测试代码
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
创建Topic
:
package com.kaven.kafka.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class Admin {
// 基于Kafka服务地址与请求超时时间来创建AdminClient实例
private static final AdminClient adminClient = Admin.getAdminClient(
"192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094",
"40000");
public static void main(String[] args) throws InterruptedException, ExecutionException {
Admin admin = new Admin();
// 创建Topic,Topic名称为topic-in,分区数为1,复制因子为1
admin.createTopic("topic-in", 1, (short) 1);
// 创建Topic,Topic名称为topic-out,分区数为1,复制因子为1
admin.createTopic("topic-out", 1, (short) 1);
}
public static AdminClient getAdminClient(String address, String requestTimeoutMS) {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address);
properties.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMS);
return AdminClient.create(properties);
}
public void createTopic(String name, int numPartitions, short replicationFactor) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic(name, numPartitions, replicationFactor))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name__, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name__);
latch.countDown();
});
});
latch.await();
}
}
Producer
发布消息:
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerTest {
public static final String[] MESSAGES = new String[]{
"Give me the strength lightly to bear my joys and sorrows",
"Give me the strength to make my love fruitful in service",
"Give me the strength never to disown the poor or bend my knees before insolent might",
"Give me the strength to raise my mind high above daily trifles",
"And give me the strength to surrender my strength to thy will with love"
};
public static void main(String[] args) {
send("topic-in");
}
public static void send(String name) {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < MESSAGES.length; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
"key-" + i,
MESSAGES[i]
);
// 异步发送并回调
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.printf("topic: %s, partition: %s, offset: %s\n", name, metadata.partition(), metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要关闭Producer实例
producer.close();
}
public static Producer<String, String> createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
// KEY的序列化器类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(properties);
}
}
Consumer
消费消息:
package com.kaven.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class ConsumerTest {
public static void main(String[] args) {
subscribeTopicList(Collections.singletonList("topic-out"));
}
public static void subscribeTopicList(List<String> topicList) {
KafkaConsumer<String, Long> consumer = createConsumer();
consumer.subscribe(topicList);
while (true) {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(10000));
records.forEach((record) -> {
System.out.printf("topic: %s, partition: %s, offset: %s, key: %s, value: %d\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
});
}
}
public static KafkaConsumer<String, Long> createConsumer() {
// Consumer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
// 组ID,用于标识此消费者所属的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kaven-test");
// 开启offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 消费者offset自动提交到Kafka的频率(以毫秒为单位)
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// KEY的反序列化器类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// VALUE的反序列化器类
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongDeserializer");
return new KafkaConsumer<>(properties);
}
}
Streams
处理消息:
package com.kaven.kafka.stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class StreamTest {
public static void main(String[] args) {
// 流的配置
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 构建流的拓扑
StreamsBuilder builder = createStreamsBuilder();
// 基于流的拓扑和配置创建KafkaStreams实例
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
}
// 构建流的拓扑
public static StreamsBuilder createStreamsBuilder() {
StreamsBuilder builder = new StreamsBuilder();
// 源,从Topic(topic-in)中获取消息
KStream<String, String> source = builder.stream("topic-in");
/*
* 处理,即单词统计
* 1 根据空格分割字符串,一个字符串可以得到一个单词列表
* 2 基于value(分组键)进行分组,此时的value是单词
* 3 按分组键计算此流中的记录数
* */
KTable<String, Long> count = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
.groupBy((key, value) -> value)
.count();
// 将数据流入Topic(topic-out)
count.toStream().to("topic-out", Produced.with(Serdes.String(), Serdes.Long()));
return builder;
}
}
先创建topic-in
和topic-out
这两个Topic
,然后使用Producer
发布消息到topic-in
,之后使用Streams
处理消息(即单词统计,统计topic-in
中的消息,再将统计数据流入topic-out
),最后使用Consumer
消费topic-out
中的消息。
Consumer
程序输出:
topic: topic-out, partition: 0, offset: 35, key: lightly, value: 2
topic: topic-out, partition: 0, offset: 36, key: bear, value: 2
topic: topic-out, partition: 0, offset: 37, key: joys, value: 2
topic: topic-out, partition: 0, offset: 38, key: sorrows, value: 2
topic: topic-out, partition: 0, offset: 39, key: make, value: 2
topic: topic-out, partition: 0, offset: 40, key: fruitful, value: 2
topic: topic-out, partition: 0, offset: 41, key: in, value: 2
topic: topic-out, partition: 0, offset: 42, key: service, value: 2
topic: topic-out, partition: 0, offset: 43, key: never, value: 2
topic: topic-out, partition: 0, offset: 44, key: disown, value: 2
topic: topic-out, partition: 0, offset: 45, key: poor, value: 2
topic: topic-out, partition: 0, offset: 46, key: or, value: 2
topic: topic-out, partition: 0, offset: 47, key: bend, value: 2
topic: topic-out, partition: 0, offset: 48, key: knees, value: 2
topic: topic-out, partition: 0, offset: 49, key: before, value: 2
topic: topic-out, partition: 0, offset: 50, key: insolent, value: 2
topic: topic-out, partition: 0, offset: 51, key: might, value: 2
topic: topic-out, partition: 0, offset: 52, key: raise, value: 2
topic: topic-out, partition: 0, offset: 53, key: mind, value: 2
topic: topic-out, partition: 0, offset: 54, key: high, value: 2
topic: topic-out, partition: 0, offset: 55, key: above, value: 2
topic: topic-out, partition: 0, offset: 56, key: daily, value: 2
topic: topic-out, partition: 0, offset: 57, key: trifles, value: 2
topic: topic-out, partition: 0, offset: 58, key: and, value: 4
topic: topic-out, partition: 0, offset: 59, key: give, value: 10
topic: topic-out, partition: 0, offset: 60, key: me, value: 10
topic: topic-out, partition: 0, offset: 61, key: the, value: 12
topic: topic-out, partition: 0, offset: 62, key: surrender, value: 2
topic: topic-out, partition: 0, offset: 63, key: my, value: 10
topic: topic-out, partition: 0, offset: 64, key: strength, value: 12
topic: topic-out, partition: 0, offset: 65, key: to, value: 12
topic: topic-out, partition: 0, offset: 66, key: thy, value: 2
topic: topic-out, partition: 0, offset: 67, key: will, value: 2
topic: topic-out, partition: 0, offset: 68, key: with, value: 2
topic: topic-out, partition: 0, offset: 69, key: love, value: 4
符合预期,因为博主通过Producer
发布了两次消息到topic-in
。
KStream
提供了丰富的流操作,类比于Java
提供的Stream
,源码分析博主以后再介绍,修改createStreamsBuilder
方法:
// 构建流的拓扑
public static StreamsBuilder createStreamsBuilder() {
StreamsBuilder builder = new StreamsBuilder();
// 源,从Topic(topic-in)中获取消息
KStream<String, String> source = builder.stream("topic-in");
// 基于空格分割字符串并直接输出
source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
.foreach((key, value) -> System.out.printf("key: %s, value: %s\n", key, value));
return builder;
}
再使用Producer
发布消息到topic-in
,之后使用Streams
处理消息(基于空格分割字符串并直接输出),输出如下所示:
key: key-0, value: give
key: key-0, value: me
key: key-0, value: the
key: key-0, value: strength
key: key-0, value: lightly
key: key-0, value: to
key: key-0, value: bear
key: key-0, value: my
key: key-0, value: joys
key: key-0, value: and
key: key-0, value: sorrows
key: key-1, value: give
key: key-1, value: me
key: key-1, value: the
key: key-1, value: strength
key: key-1, value: to
key: key-1, value: make
key: key-1, value: my
key: key-1, value: love
key: key-1, value: fruitful
key: key-1, value: in
key: key-1, value: service
key: key-2, value: give
key: key-2, value: me
key: key-2, value: the
key: key-2, value: strength
key: key-2, value: never
key: key-2, value: to
key: key-2, value: disown
key: key-2, value: the
key: key-2, value: poor
key: key-2, value: or
key: key-2, value: bend
key: key-2, value: my
key: key-2, value: knees
key: key-2, value: before
key: key-2, value: insolent
key: key-2, value: might
key: key-3, value: give
key: key-3, value: me
key: key-3, value: the
key: key-3, value: strength
key: key-3, value: to
key: key-3, value: raise
key: key-3, value: my
key: key-3, value: mind
key: key-3, value: high
key: key-3, value: above
key: key-3, value: daily
key: key-3, value: trifles
key: key-4, value: and
key: key-4, value: give
key: key-4, value: me
key: key-4, value: the
key: key-4, value: strength
key: key-4, value: to
key: key-4, value: surrender
key: key-4, value: my
key: key-4, value: strength
key: key-4, value: to
key: key-4, value: thy
key: key-4, value: will
key: key-4, value: with
key: key-4, value: love
这里是直接输出,也可以将这些数据存储于Mysql
、Redis
或者ES
中。博客就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。