0
点赞
收藏
分享

微信扫一扫

Github上开源了一款AI虚拟试衣,看看效果

目录

Windows下操作


1. 安装kafka kafka_2.12-3.6.2

官网: https://kafka.apache.org/downloads

文件目录结构如下:

binlinux系统下可执行脚本文件
bin/windowswindows系统下可执行脚本文件
config配置文件
libs依赖类库
licenses许可信息
site-docs文档
logs服务日志

2. 启动Zookeeper


2.1 进入Kafka的config目录,修改zookeeper.properties配置文件

dataDir=F:/Apache/kafka_2.13-3.6.2/data/zk

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

启动命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

在解压目录下创建快速启动脚本:zk.cmd

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

3. 启动kafka


3.1 进入Kafka的config目录,修改server.properties配置文件

log.dirs=F:/Apache/kafka_2.13-3.6.2/data/kafka

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

启动命令:

kafka-server-start.bat ../../config/server.properties

在解压目录下创建快速启动脚本:kfk.cmd

call bin/windows/kafka-server-start.bat config/server.properties

4.消费主题

消息发布/订阅(Publish/Subscribe)、将不同的消息进行分类,分成不同的主题(Topic)


4.1 创建主题

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

创建命令:

kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test

4.2 查询主题

kafka-topics.bat --bootstrap-server localhost:9092 --list

4.2.1 查询指定主题信息
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test

4.3 修改主题

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

4.4 删除主题

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete

5. 生产数据

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

输入生产数据:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

5.1 工具操作

指定的话:java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=localhost:9092

访问http://localhost:9000/

  • kafka Tool(有点难用)
  • Java API
  <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>
    </dependencies>

创建类:


package cn.coisini;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 配置属性集合
        Map<String, Object> configMap = new HashMap<>();
        // 配置属性:Kafka服务器集群地址
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 创建Kafka生产者对象,建立Kafka连接
        // 构造对象时,需要传递配置参数
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // 准备数据,定义泛型
        // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1");
        // 生产(发送)数据
        producer.send(record);
        // 关闭生产者连接
        producer.close();
    }
}

6. 消费数据

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

JavaAPi调用:

public class KafkaConsumerTest {
    public static void main(String[] args) {
        // 配置属性集合
        Map<String, Object> configMap = new HashMap<String, Object>();
        // 配置属性:Kafka集群地址
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        // 配置属性: 消费者组
        configMap.put("group.id", "coisini");
        // 配置属性: 自动提交偏移量
        configMap.put("enable.auto.commit", "true");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);
        // 消费者订阅指定主题的数据
        consumer.subscribe(Collections.singletonList("test"));
        while ( true ) {
            // 每隔100毫秒,抓取一次数据
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));
            // 打印抓取的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("K = " + record.key() + ", V = " + record.value());
            }
        }
    }
}

7. 源码关联(可忽略)


8. Kafka集群部署

创建文件夹kafka-cluster


8.1 安装Zookeeper

  • 解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为zookeeper

  • 修改config/zookeeper.properties文件

    dataDir=F:/kafka-cluster/zookeeper/data
    clientPort=2181
    

8.2 安装kafka

  • 解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为kafka-node-1

  • 修改config/server.properties配置文件

    broker.id=1
    listeners=PLAINTEXT://:9091
    log.dirs=F:/kafka-cluster/kafka-node-1/data
    zookeeper.connect=localhost:2181/kafka
    

分别拷贝修改目录名kafka-node-2kafka-node-3

配置文件中 broker.id=1 改为 broker.id=2broker.id=3

配置文件中 端口 9091 改为 90929093

配置文件中 数据目录kafka-broker-1 改为 kafka-node-2kafka-node-3


8.3 启动脚本

  • kafka-zookeeper 目录下 创建 zk.cmd

    call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
    
  • kafka-broker-1、2、3目录下 创建 kfk.cmd

    call bin/windows/kafka-server-start.bat config/server.properties
    
  • kafka-cluster 集群目录下 创建启动 cluster.cmd 批处理文件

    cd kafka-zookeeper
    start zk.cmd
    ping 127.0.0.1 -n 10 >nul
    cd ../kafka-node-1
    start kfk.cmd
    cd ../kafka-node-2
    start kfk.cmd
    cd ../kafka-node-3
    start kfk.cmd
    
    • 创建清理和重置kafka数据的 cluster-clear.cmd 批处理文件
cd kafka-zookeeper
rd /s /q data
cd ../kafka-node-1
rd /s /q data
cd ../kafka-node-2
rd /s /q data
cd ../kafka-node-3
rd /s /q data


Linux下操作


1. 安装zookeeper

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.gz

tar -zxvf apache-zookeeper-3.7.2-bin.tar.gz

mv apache-zookeeper-3.7.2-bin zookeeper-3.7.2

mkdir zookeeper-3.7.2/zkData

echo “1” > zookeeper-3.7.2/zkData/myid

cd zookeeper-3.7.2/conf/

mv zoo_sample.cfg zoo.cfg

vi zoo.cfg

dataDir=/opt/coisini/kafka/zookeeper-3.7.2/zkData

cd …

bin/zkServer.sh start

bin/zkServer.sh stop

bin/zkServer.sh status


2. 安装kafka

官网: https://kafka.apache.org/downloads

wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz

tar -zxvf kafka_2.13-3.6.2.tgz

mkdir kafka_2.13-3.6.2/kfkData

cd kafka_2.13-3.6.2/config/

log.dirs=/opt/coisini/kafka/kafka_2.13-3.6.2/kfkData

broker.id=0

advertised.listeners=PLAINTEXT://coisini:9092

bin/kafka-server-start.sh -daemon config/server.properties

bin/kafka-server-stop.sh

开放端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

firewall-cmd --reload


(未完结,后续有时间再写)

举报

相关推荐

0 条评论