0
点赞
收藏
分享

微信扫一扫

Kafka的简单使用

📢 此文档中的命令是基于mac或者linux,windows下需要把bin/xxx.sh替换成bin/windows/xxxx.bat

一、下载

http://kafka.apache.org/downloads

二、启动

1.启动zookeeper

进入安装路径后,在路径栏输入cmd

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

2.启动kafka

再次进入安装路径后,在路径栏输入cmd,新开一个cmd窗口

./bin/kafka-server-start.sh ./config/server.properties

注意:以上两个窗口不要关闭

三、测试

1.创建主题XXX

./bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic XXX

低版本的使用--zookeeper 高版本使用--bootstrap-server 端口号要也要根据版本,填写zookeeper(2181)或者kafka的端口号(9092)

2.查看主题

./bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092

低版本的使用--zookeeper 高版本使用--bootstrap-server 端口号要也要根据版本,填写zookeeper(2181)或者kafka的端口号(9092)

3.修改主题

# 修改主题topic1的分区为3个
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter  --topic topic1 --partitions 3

低版本的使用--zookeeper 高版本使用--bootstrap-server 端口号要也要根据版本,填写zookeeper(2181)或者kafka的端口号(9092)

4.创建生产者

进入安装路径,打开一个新的cmd窗口

./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic1

5.创建消费者

进入安装路径,打开一个新的cmd窗口

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic XXX --from-beginning

低版本的使用--zookeeper 高版本使用--bootstrap-server 端口号要也要根据版本,填写zookeeper(2181)或者kafka的端口号(9092)

四、代码实现消费者

package test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaConsumer {

	public static void main(String[] args) {
		String servers = "127.0.0.1:9092";
		String groupId = "test-consumer-group"; // 消费组名称
		String[] topic = new String[] { "topic1" }; // 主题名称

		Properties props = new Properties();
		props.put("bootstrap.servers", servers);
		props.put("group.id", groupId); // 消费组id
		props.put("enable.auto.commit", "true"); // 设置自动提交偏移量,在每次poll的时候,会提交上一次处理的消息的偏移量
		props.put("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("auto.offset.reset", "earliest"); // 偏移量重置的规则,earliest:使用最早的消息的偏移量;latest:使用最新消息的偏移量

		// 创建消费者
		org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
		// 订阅主题消息
		consumer.subscribe(Arrays.asList(topic));
		try {
			while (true) {
				// 拉取消息
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
				for (ConsumerRecord<String, String> record : records) {
					System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
					System.out.println("");
					// TODO 在此处理一些业务逻辑
					// doSomeThing();
				}
			}
		} finally {
			consumer.close();
		}
	}
}

举报

相关推荐

0 条评论