0
点赞
收藏
分享

微信扫一扫

【学习笔记】大数据技术之Kafka3.x(监控,外部系统集成)

浮游图灵 2022-04-05 阅读 41

大数据技术之Kafka3.x

第 6 章 Kafka-Eagle 监控

Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。

6.1 MySQL 环境准备

Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。如果集群中之前安装过 MySQL 可以跨过该步。

6.2 Kafka 环境准备

1)关闭 Kafka 集群

[atguigu@hadoop102 kafka]$ kf.sh stop

2)修改/opt/module/kafka/bin/kafka-server-start.sh 命令中

[atguigu@hadoop102 kafka]$ vim bin/kafka-server-start.sh

修改如下参数值:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
 export JMX_PORT="9999"
 #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在启动 Kafka 之前要分发之其他节点

[atguigu@hadoop102 bin]$ xsync kafka-server-start.sh

6.3 Kafka-Eagle 安装

0)官网:https://www.kafka-eagle.org/
1)上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群/opt/software 目录
2)解压到本地

[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz 

3)进入刚才解压的目录

[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ ll

总用量 79164
-rw-rw-r--. 1 atguigu atguigu 81062577 1013 00:00 efak-web2.0.8-bin.tar.gz

4)将 efak-web-2.0.8-bin.tar.gz 解压至/opt/module

[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web2.0.8-bin.tar.gz -C /opt/module/

5)修改名称

[atguigu@hadoop102 module]$ mv efak-web-2.0.8/ efak

6)修改配置文件 /opt/module/efak/conf/system-config.properties

[atguigu@hadoop102 conf]$ vim system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.efak.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32
######################################
# EFAK webui port
######################################
efak.webui.port=8048
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
######################################
# kafka offset storage
######################################
# offset 保存在 kafka
cluster1.efak.offset.storage=kafka

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15
######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramL
oginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainL
oginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=
######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=
######################################
# kafka sqlite jdbc driver address

######################################


# 配置 mysql 连接
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=000000
######################################
# kafka mysql jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=U
TF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=123456

7)添加环境变量

[atguigu@hadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK
export KE_HOME=/opt/module/efak
export PATH=$PATH:$KE_HOME/bin

注意:source /etc/profile

[atguigu@hadoop102 conf]$ source /etc/profile

8)启动
(1)注意:启动之前需要先启动 ZK 以及 KAFKA

[atguigu@hadoop102 kafka]$ kf.sh start

(2)启动 efak

[atguigu@hadoop102 efak]$ bin/ke.sh start
Version 2.0.8 -- Copyright 2016-2021
*****************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.10.102:8048'
* Account:admin ,Password:123456
*****************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*****************************************************************

说明:如果停止 efak,执行命令。

[atguigu@hadoop102 efak]$ bin/ke.sh stop

6.4 Kafka-Eagle 页面操作

1)登录页面查看监控数据
http://192.168.10.102:8048/
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

第 7 章 Kafka-Kraft 模式

2.8.0新特性

7.1 Kafka-Kraft 架构

在这里插入图片描述

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。

这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行;
  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。

7.2 Kafka-Kraft 集群部署

1)再次解压一份 kafka 安装包

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

2)重命名为 kafka2

[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka2

3)在 hadoop102 上修改/opt/module/kafka2/config/kraft/server.properties 配置文件

[atguigu@hadoop102 kraft]$ vim server.properties
#kafka 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能)
process.roles=broker, controller
#节点 ID
node.id=2
#controller 服务协议别名
controller.listener.names=CONTROLLER
#全 Controller 列表
controller.quorum.voters=2@hadoop102:9093,3@hadoop103:9093,4@hado
op104:9093
#不同服务器绑定的端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#broker 服务协议别名
inter.broker.listener.name=PLAINTEXT
#broker 对外暴露的地址
advertised.Listeners=PLAINTEXT://hadoop102:9092
#协议别名到安全协议的映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLA
INTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#kafka 数据存储目录
log.dirs=/opt/module/kafka2/data

4)分发 kafka2

[atguigu@hadoop102 module]$ xsync kafka2/
  • 在 hadoop103 和 hadoop104 上 需 要 对 node.id 相应改变 , 值 需 要 和
    controller.quorum.voters 对应。
  • 在 hadoop103 和 hadoop104 上需要 根据各自的主机名称,修改相应的
    advertised.Listeners 地址。

5)初始化集群数据目录
(1)首先生成存储目录唯一 ID。

[atguigu@hadoop102 kafka2]$ bin/kafka-storage.sh random-uuid
J7s9e8PPTKOO47PxzI39VA

(2)用该 ID 格式化 kafka 存储目录(三台节点)。

[atguigu@hadoop102 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
[atguigu@hadoop103 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
[atguigu@hadoop104 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties

6)启动 kafka 集群

[atguigu@hadoop102 kafka2]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties
[atguigu@hadoop103 kafka2]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties
[atguigu@hadoop104 kafka2]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties

7)停止 kafka 集群

[atguigu@hadoop102 kafka2]$ bin/kafka-server-stop.sh
[atguigu@hadoop103 kafka2]$ bin/kafka-server-stop.sh
[atguigu@hadoop104 kafka2]$ bin/kafka-server-stop.sh

7.3 Kafka-Kraft 集群启动停止脚本
1)在/home/atguigu/bin 目录下创建文件 kf2.sh 脚本文件

[atguigu@hadoop102 bin]$ vim kf2.sh

脚本如下:

#! /bin/bash
case $1 in
"start"){
 for i in hadoop102 hadoop103 hadoop104
 do
 echo " --------启动 $i Kafka2-------"
 ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"
 done
};;
"stop"){
 for i in hadoop102 hadoop103 hadoop104
 do
 echo " --------停止 $i Kafka2-------"
 ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "
 done
};;
esac

2)添加执行权限

[atguigu@hadoop102 bin]$ chmod +x kf2.sh

3)启动集群命令

[atguigu@hadoop102 ~]$ kf2.sh start

4)停止集群命令

[atguigu@hadoop102 ~]$ kf2.sh sto

第 1 章 集成 Flume

Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于Flume 的消费者。
在这里插入图片描述

1.1 Flume 生产者

在这里插入图片描述

(1)启动 kafka 集群

[atguigu@hadoop102 ~]$ zk.sh start
[atguigu@hadoop102 ~]$ kf.sh start

(2)启动 kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

(3)Flume 安装步骤
在 hadoop102 主机上安装 Flume。
详见:尚硅谷大数据技术之 Flume

(4)配置 Flume
在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf

[atguigu@hadoop102 flume]$ mkdir jobs
[atguigu@hadoop102 flume]$ vim jobs/file_to_kafka.conf 

配置文件内容如下

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = 
/opt/module/flume/taildir_position.json
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)启动 Flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf &

&: 表示后台启动
(6)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况

[atguigu@hadoop102 module]$ mkdir applog
[atguigu@hadoop102 applog]$ echo hello >> 
/opt/module/applog/app.log

(7)观察 kafka 消费者,能够看到消费的 hello 数据

1.2 Flume 消费者

在这里插入图片描述

(1)配置 Flume
在 hadoop102 节点的 Flume 的/opt/module/flume/jobs 目录下创建 kafka_to_file.conf

[atguigu@hadoop102 jobs]$ vim kafka_to_file.conf

配置文件内容如下

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger

# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)启动 Flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console

(3)启动 kafka 生产者

[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

并输入数据,例如:hello world
(4)观察控制台输出的日志

第 2 章 集成 Flink

Flink 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于Flink 的消费者。
在这里插入图片描述

1)Flink 环境准备
(1)创建一个 maven 项目 flink-kafka
(2)添加配置文件

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
</dependencies>

(3)将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd 
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd 
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

(4)在 java 文件夹下创建包名为 com.atguigu.flink

2.1 Flink 生产者

(1)在 com.atguigu.flink 包下创建 java 类:FlinkKafkaProducer1

package com.atguigu.flink;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import java.util.ArrayList;
        import java.util.Properties;
public class FlinkKafkaProducer1 {
    public static void main(String[] args) throws Exception {
        // 0 初始化 flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        // 1 读取集合中数据
        ArrayList<String> wordsList = new ArrayList<>();
        wordsList.add("hello");
        wordsList.add("world");

        DataStream<String> stream = env.fromCollection(wordsList);
        // 2 kafka 生产者配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 3 创建 kafka 生产者
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                "first",
                new SimpleStringSchema(),
                properties
        );
        // 4 生产者和 flink 流关联
        stream.addSink(kafkaProducer);
        // 5 执行
        env.execute();
    }
}

(2)启动 Kafka 消费者

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

(3)执行 FlinkKafkaProducer1 程序,观察 kafka 消费者控制台情况

2.2 Flink 消费者

(1)在 com.atguigu.flink 包下创建 java 类:FlinkKafkaConsumer1

package com.atguigu.flink;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.common.serialization.StringDeserializer;
        import java.util.Properties;
public class FlinkKafkaConsumer1 {
    public static void main(String[] args) throws Exception {

        // 0 初始化 flink 环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        // 1 kafka 消费者配置信息
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "hadoop102:9092");
        // 2 创建 kafka 消费者
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "first",
                new SimpleStringSchema(),
                properties
        );
        // 3 消费者和 flink 流关联
        env.addSource(kafkaConsumer).print();
        // 4 执行
        env.execute();
    }
}

(2)启动 FlinkKafkaConsumer1 消费者
(3)启动 kafka 生产者

[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

(4)观察 IDEA 控制台数据打印

第 3 章 集成 SpringBoot

第 4 章 集成 Spark

Spark 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于Spark 的消费者。
在这里插入图片描述

1)Scala 环境准备
尚硅谷大数据技术之Scala(3.8).docx
2)Spark 环境准备
(1)创建一个 maven 项目 spark-kafka
(2)在项目 spark-kafka 上点击右键,Add Framework Support=》勾选 scala
(3)在 main 下创建 scala 文件夹,并右键 Mark Directory as Sources Root=>在 scala 下创建包名为 com.atguigu.spark
(4)添加配置文件

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>

(5)将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd 
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd 
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

4.1 Spark 生产者

(1)在 com.atguigu.spark 包下创建 scala Object:SparkKafkaProducer

package com.atguigu.spark
        import java.util.Properties
        import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
        object SparkKafkaProducer {def main(args: Array[String]): Unit = {
        // 0 kafka 配置信息
        val properties = new Properties()
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        "hadoop102:9092,hadoop103:9092,hadoop104:9092")
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        classOf[StringSerializer])
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        classOf[StringSerializer])
        // 1 创建 kafka 生产者
        var producer = new KafkaProducer[String, String](properties)
        // 2 发送数据
        for (i <- 1 to 5){
        producer.send(new
        ProducerRecord[String,String]("first","atguigu" + i))
        }
        // 3 关闭资源
        producer.close()
        }
        }

(2)启动 Kafka 消费者

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

(3)执行 SparkKafkaProducer 程序,观察 kafka 消费者控制台情况

4.2 Spark 消费者

(1)添加配置文件

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>

(2)在 com.atguigu.spark 包下创建 scala Object:SparkKafkaConsumer
package com.atguigu.spark

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkKafkaConsumer {
 def main(args: Array[String]): Unit = {
 //1.创建 SparkConf
 val sparkConf: SparkConf = new 
SparkConf().setAppName("sparkstreaming").setMaster("local[*]")
 //2.创建 StreamingContext
 val ssc = new StreamingContext(sparkConf, Seconds(3))
 //3.定义 Kafka 参数:kafka 集群地址、消费者组名称、key 序列化、value 序列化
 val kafkaPara: Map[String, Object] = Map[String, Object](
 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> 
"hadoop102:9092,hadoop103:9092,hadoop104:9092",
 ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup",
 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[StringDeserializer],
 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[StringDeserializer]
 )
 //4.读取 Kafka 数据创建 DStream
 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 
KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent, //优先位置
 ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara)// 消费策略:(订阅
多个主题,配置参数)
 )
 //5.将每条消息的 KV 取出
 val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
 //6.计算 WordCount
 valueDStream.print()
 //7.开启任务
 ssc.start()
 ssc.awaitTermination()
 }
}

(3)启动 SparkKafkaConsumer 消费者
(4)启动 kafka 生产者

[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

(5)观察 IDEA 控制台数据打

举报

相关推荐

0 条评论