0
点赞
收藏
分享

微信扫一扫

使用 Clickhouse 集成的表引擎同步数据方式详解

Python芸芸 04-06 11:00 阅读 1

目录

1. kafka 下载

2. 修改配置文件

2.1 文件夹内容

2.2 创建一个 data 空文件夹

2.3 修改 zookeeper.properties 配置文件

2.4 修改 server.properties 配置文件

2.5 创建 "zk.cmd" windows脚本文件

2.6 创建 "kfk.cmd" windows脚本文件

3. 启动 kafka

4. 创建主题+生产者消费者演示

4.1 创建 topic 主题

4.2 命令行创建生产者

4.3 命令行创建消费者

4.4 生产者发送消息供消费者消费

5. demo 书写

5.1 创建项目

5.2 引入依赖

 5.3 生产者测试类代码书写

5.4 消费者测试类代码书写


1. kafka 下载

本片是小白入门篇,所以我们不以Linux操作系统为例,选择大多数小白都用的windows。

ksfka下载链接如下,点击链接进入官网即可下载

温馨提示:JDK版本至少需要1.8,高版本也可兼容;

Apache Kafkaicon-default.png?t=N7T8https://kafka.apache.org/downloads本篇中以kafka_2.1.3-3.6.1版本为例,直接点击对应的版本下载即可,tgz包就类似于我们常见的zip,下载完成之后解压即可。

下载完毕,我们就可以解压得到 kafka 了

 解压之后就可以得到 kafka 文件了

2. 修改配置文件

2.1 文件夹内容

打开文件夹后可以发现内部含有bin文件夹,config配置夹,libs依赖夹等,和JDK,maven 问价夹的格式如出一辙;

2.2 创建一个 data 空文件夹

后续需要用来存放日志文件,只要创建完成就可以了,kafka启动后会自动生成日志文件;

2.3 修改 zookeeper.properties 配置文件

我们点击进入config文件夹,找到 zookeeper.properties 配置文件,双击进行修改,

然后,我们找到 dataDir ,将它的值修改为我们刚才创建的 data 文件的路径,还要注意一点,在后面还要多加一个 "/zk",因为一会还要配置 server.properties ,所以要用将她们两个区分开

2.4 修改 server.properties 配置文件

和刚才一样,我们双击修改 "server.properties" 配置文件

我们修改 log.dirs 的值为刚才创建的 data 文件夹的路径,在路径末尾再添加上 "/kafka" ,用来和刚才的zk做区分,kafka 文件夹用来存放kafka的日志文件,zk 文件夹用来存放zoopeeper的日志文件;

2.5 创建 "zk.cmd" windows脚本文件

  以记事本的方式打开,然后加入下面这句话,

这句话的含义就是启动 Zookeeper ,并且启动文件为 "zookeeper.properties" ;

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
2.6 创建 "kfk.cmd" windows脚本文件

仍然以记事本的方式打开,然后加入下面这句话,

这句话的含义就是启动 kafka ,并且启动文件为 "server.properties" ;

call bin/windows/kafka-server-start.bat config/server.properties

此时,我们的 kafka 文件夹中就多了我们刚刚创建的 data 文件夹,kafka.cmd 脚本文件,zk.cmd 脚本文件;

3. 启动 kafka

经过第二部的配置,现在一切都已经准备就绪,我们只需要双击 zk.cmd 和 kafka.cmd 脚本文件启动kafka;

这里需要注意一点,必须先启动双击 zk.cmd 启动 zookeeper,

再双击 kafka.cmd 启动 kafka,关闭的时候,需要先关闭 kafka,再关闭 zookeeper ;

4. 创建主题+生产者消费者演示

4.1 创建 topic 主题

我们来到 bin 文件夹下的 windows 文件夹,打开 cmd 命令窗口,运行下方命令

# --bootstrap-server localhost:9092  配置服务器连接,此处为本机,9092为kafka默认端口号
# --topic test  创建topic主题,主题名称为 test
# --create      创建topic主题命令
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create     

4.2 命令行创建生产者

仍然是在 windows 文件夹下新建一个命令窗口,刚才我们已经创建出了名为 "topic" 的主题,现在运行如下命令启动脚本文件创建生产者连接上我们的 topic 主题

# 运行 kafka-console-producer.bat 脚本创建生产者连接本机9092端口名为 test 的主题
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

 运行如下所示,会出现一个小箭头,就说明我们链接主题成功,我们生产者发布的主题都会发送到 topic 主题中供消费者去消费使用;

4.3 命令行创建消费者
# 运行 kafka-console-consumer.bat 脚本创建消费者连接本机9092端口名为 test 的主题
windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

运行成功不会有任何显示, 

4.4 生产者发送消息供消费者消费

如下图所示,我在生产者命令窗口输入 "hello kafka",点击回车,我们就可以在消费者中命令窗口中看到发送过来的 "hello kafka" 消息

5. demo 书写

5.1 创建项目

5.2 引入依赖

在 maven 项目的 pom.xml 文件中加入下方依赖,

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>
    </dependencies>
 5.3 生产者测试类代码书写

我们随便创建一个类即可,名字随意取,代码逻辑备有注释;

public class KafkaProducerTest {
    public static void main(String[] args) {
        // TODO 创建配置对象
        // 创建生产者对象又分为两步
            // 1. 创建配置对象集合
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
            // 2. 配置数据 Key-Value 的序列化方式
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // TODO 创建生产者对象
        // 将配置对象集合作为参数传入
        // 返回值就是生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configMap);

        // TODO 创建数据
        // 自己定义一个数据,传入三个参数,第一个参数为主题,第二个参数为数据的key,第三个参数为数据的value
        ProducerRecord<String,String> record = new ProducerRecord<>("test","first","hello kafka");

        // TODO 发送数据
        kafkaProducer.send(record);

        // TODO 关闭生产者对象
        kafkaProducer.close();
    }
}
5.4 消费者测试类代码书写
public class KafkaConsumerTest {
    public static void main(String[] args) {
        // TODO 创建消费者配置对象
        // 创建消费者配置对象集合
        HashMap<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // TODO 创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(consumerConfig);

        // TODO 订阅消费主体,主题名称为 test
        consumer.subscribe(Collections.singletonList("test"));

        // TODO 从主题中获取数据消费
        final ConsumerRecords<String, String> datas = consumer.poll(100);

        for (ConsumerRecord<String,String> data : datas){
            System.out.println(data);
        }

        // TODO 关闭消费者对象
        consumer.close();
    }
}
举报

相关推荐

0 条评论