目录
- Windows下操作
- Linux下操作
Windows下操作
1. 安装kafka kafka_2.12-3.6.2
官网: https://kafka.apache.org/downloads
文件目录结构如下:
bin | linux系统下可执行脚本文件 |
---|---|
bin/windows | windows系统下可执行脚本文件 |
config | 配置文件 |
libs | 依赖类库 |
licenses | 许可信息 |
site-docs | 文档 |
logs | 服务日志 |
2. 启动Zookeeper
2.1 进入Kafka的config目录,修改zookeeper.properties配置文件
dataDir=F:/Apache/kafka_2.13-3.6.2/data/zk
进入cmd界面
cd F:/Apache/kafka_2.13-3.6.2/bin/windows
启动命令:
zookeeper-server-start.bat ../../config/zookeeper.properties
在解压目录下创建快速启动脚本:zk.cmd
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
3. 启动kafka
3.1 进入Kafka的config目录,修改server.properties配置文件
log.dirs=F:/Apache/kafka_2.13-3.6.2/data/kafka
进入cmd界面
cd F:/Apache/kafka_2.13-3.6.2/bin/windows
启动命令:
kafka-server-start.bat ../../config/server.properties
在解压目录下创建快速启动脚本:kfk.cmd
call bin/windows/kafka-server-start.bat config/server.properties
4.消费主题
消息发布/订阅(Publish/Subscribe)、将不同的消息进行分类,分成不同的主题(Topic)
4.1 创建主题
进入cmd界面
cd F:/Apache/kafka_2.13-3.6.2/bin/windows
创建命令:
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
4.2 查询主题
kafka-topics.bat --bootstrap-server localhost:9092 --list
4.2.1 查询指定主题信息
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test
4.3 修改主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2
4.4 删除主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
5. 生产数据
进入cmd界面
cd F:/Apache/kafka_2.13-3.6.2/bin/windows
输入生产数据:
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
5.1 工具操作
指定的话:java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=localhost:9092
访问http://localhost:9000/
- kafka Tool(有点难用)
- Java API
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
</dependencies>
创建类:
package cn.coisini;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerTest {
public static void main(String[] args) {
// 配置属性集合
Map<String, Object> configMap = new HashMap<>();
// 配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者对象,建立Kafka连接
// 构造对象时,需要传递配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// 准备数据,定义泛型
// 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1");
// 生产(发送)数据
producer.send(record);
// 关闭生产者连接
producer.close();
}
}
6. 消费数据
进入cmd界面
cd F:/Apache/kafka_2.13-3.6.2/bin/windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
JavaAPi调用:
public class KafkaConsumerTest {
public static void main(String[] args) {
// 配置属性集合
Map<String, Object> configMap = new HashMap<String, Object>();
// 配置属性:Kafka集群地址
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// 配置属性: 消费者组
configMap.put("group.id", "coisini");
// 配置属性: 自动提交偏移量
configMap.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);
// 消费者订阅指定主题的数据
consumer.subscribe(Collections.singletonList("test"));
while ( true ) {
// 每隔100毫秒,抓取一次数据
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// 打印抓取的数据
for (ConsumerRecord<String, String> record : records) {
System.out.println("K = " + record.key() + ", V = " + record.value());
}
}
}
}
7. 源码关联(可忽略)
8. Kafka集群部署
创建文件夹kafka-cluster
8.1 安装Zookeeper
-
解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为
zookeeper
-
修改config/zookeeper.properties文件
dataDir=F:/kafka-cluster/zookeeper/data clientPort=2181
8.2 安装kafka
-
解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为
kafka-node-1
-
修改config/server.properties配置文件
broker.id=1 listeners=PLAINTEXT://:9091 log.dirs=F:/kafka-cluster/kafka-node-1/data zookeeper.connect=localhost:2181/kafka
分别拷贝修改目录名为kafka-node-2
、kafka-node-3
配置文件中 broker.id=1 改为 broker.id=2
、broker.id=3
配置文件中 端口 9091 改为 9092
、9093
配置文件中 数据目录kafka-broker-1 改为 kafka-node-2
、kafka-node-3
8.3 启动脚本
-
kafka-zookeeper 目录下 创建 zk.cmd
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
-
kafka-broker-1、2、3目录下 创建 kfk.cmd
call bin/windows/kafka-server-start.bat config/server.properties
-
kafka-cluster 集群目录下 创建启动 cluster.cmd 批处理文件
cd kafka-zookeeper start zk.cmd ping 127.0.0.1 -n 10 >nul cd ../kafka-node-1 start kfk.cmd cd ../kafka-node-2 start kfk.cmd cd ../kafka-node-3 start kfk.cmd
- 创建清理和重置kafka数据的 cluster-clear.cmd 批处理文件
cd kafka-zookeeper
rd /s /q data
cd ../kafka-node-1
rd /s /q data
cd ../kafka-node-2
rd /s /q data
cd ../kafka-node-3
rd /s /q data
Linux下操作
1. 安装zookeeper
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.7.2-bin.tar.gz
mv apache-zookeeper-3.7.2-bin zookeeper-3.7.2
mkdir zookeeper-3.7.2/zkData
echo “1” > zookeeper-3.7.2/zkData/myid
cd zookeeper-3.7.2/conf/
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir=/opt/coisini/kafka/zookeeper-3.7.2/zkData
cd …
bin/zkServer.sh start
bin/zkServer.sh stop
bin/zkServer.sh status
2. 安装kafka
官网: https://kafka.apache.org/downloads
wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz
tar -zxvf kafka_2.13-3.6.2.tgz
mkdir kafka_2.13-3.6.2/kfkData
cd kafka_2.13-3.6.2/config/
log.dirs=/opt/coisini/kafka/kafka_2.13-3.6.2/kfkData
broker.id=0
advertised.listeners=PLAINTEXT://coisini:9092
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh
开放端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
(未完结,后续有时间再写)