0
点赞
收藏
分享

微信扫一扫

spark大数据分析:sparkStrreaming(16)结合kafka

洲行 2022-02-10 阅读 41



文章目录


  • ​​高阶API​​
  • ​​低阶API​​


依赖

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>

高阶API

package stream

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object TestStream2 {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Chapter8_4_2")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

/**
* 2 为读取分区线程数
*/
val value: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "note01:2181,note02:2181,note03:2181", "group_test", Map("test" -> 2))
value.map(_._2).filter(_.equals("A")).foreachRDD(
rdd => {
//Driver上执行
rdd.foreachPartition(
p => {
//Executor上执行
p.foreach(result => println(result))
}
)
}
)

ssc.start()
ssc.awaitTermination()
}

高阶API会自己维护偏移量,低阶API需要自己手动维护偏移量

低阶API

在项目中大多使用低阶API读取kafka中数据,如果数据量较大,集群健康状态不稳定,网络波动低阶API有助于对于消费失败数据进行精准恢复

封装ZKUtil

package utils

import org.I0Itec.zkclient.exception.ZkMarshallingError
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.hadoop.yarn.lib.ZKClient

object ZKUtil {
def initZKClient(zkServers: String, sessionTimeOut: Int, connectionTimeOut: Int) = {
new ZKClient(zkServers,sessionTimeOut,connectionTimeOut,new ZkSerializer {
override def serialize(o: Any): Array[Byte] = {
try{
o.toString.getBytes("UTF-8")
}catch {
case _:ZkMarshallingError => null
}
}

override def deserialize(bytes: Array[Byte]): AnyRef = {
try{
new String(bytes,"UTF-8")
}catch {
case _:ZkMarshallingError => null
}
}
})
}
}

消费数据自己维护offset

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, Success}
import utils.ZKUtil

import scala.collection.mutable
import scala.util.{Success, Try}

object StreamText {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("StreamText")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Set("text_01")
val kafkaParams = mutable.Map[String, String]()
kafkaParams.put("bootstrap.servers", "note01:9092")
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("session.timeout.ms", "30000")
kafkaParams.put("enable.auto.commit", "false")
kafkaParams.put("max.poll.records", "100")
kafkaParams.put("kafka.topics", "spark_streaming_test")
kafkaParams.put("group.id", "g_spark_test")
val zkHost = "note01:2181,note02:2181,note03:2181"
val sessionTimeout = 120000
val connectionTimeout = 60000
val zkClient = ZKUtil.initZKClient(zkHost, sessionTimeout, connectionTimeout)

val zkTopic = "spark_streaming_test"
val zkConsumerGroupId = "g_spark_test"

val zKGroupTopicDirs = new ZKGroupTopicDirs(zkConsumerGroupId, zkTopic)
val zkTopicPath = zKGroupTopicDirs.consumerOffsetDir
val childrenCount = zkClient.countChildren(zkTopicPath)
var kafkaStream: InputDStream[(String, String)] = null
var fromOffsets: Map[TopicAndPartition, Long] = Map()

/**
* 对应kafka信息存储在 /consumer/groupId/offsets/topic/分区号
*/
//手动维护偏移量
kafkaStream = if (childrenCount > 0) {
//非第一次
//构建TopicMetadataRequest实例向kafka发送请求,获取TopicMetadataResponse实例得到kafka指定主题的各个分区分布状态
val req = new TopicMetadataRequest(topics.toList, 0)
val consumer = new SimpleConsumer("note01", 9092, 10000, 10000, "StreamingOffsetObserver")
val res = consumer.send(req)
val option = res.topicsMetadata.headOption
val partitions = option match {
case Some(tm) => tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
case None => Map[Int, String]()
}
for (partition <- 0 until childrenCount) {
val partitionOffset = zkClient.readData[String](zkTopicPath + "/" + partition)
val tp = TopicAndPartition(kafkaParams("kafka.topics"), partition)

val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
val consumerMin = new SimpleConsumer(partitions(partition), 9092, 10000, 10000, "getMinOffset")
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.nonEmpty && nextOffset < curOffsets.head) {
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset)
}

val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.key, mam.message)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams.toMap, fromOffsets, messageHandler)
} else {
//没有从头开始消费
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams.toMap, topics)
}

/**
* 用于存储偏移量信息
*/
var offsetRanges: Array[OffsetRange] = null
val kafkaInputDStream = kafkaStream.transform {
rdd => {
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
}
val kafkaValues = kafkaInputDStream.map(_._2)

//处理数据
val kafkaSplits = kafkaValues.map(_.split(","))
val kafkaFilters = kafkaSplits.filter(arr => {
if (arr.length == 3) {
Try(arr(2).toInt) match {
case Success(_) if arr(2).toInt > 3 => true
case _ => false
}
} else {
false
}
})

val results = kafkaFilters.map(_.mkString(","))
results.foreachRDD(rdd => {
//在Driver端执行
rdd.foreachPartition(p => {
//在Worker端执行
//如果将输出结果保存到某个数据库,可在此处实例化数据库的连接器
p.foreach(result => {
//在Worker端执行,保存到数据库时,在此处使用连接器。
println(result)
})
})
//ZkUtils不可序列化,所以需要在Driver端执行
for (o <- offsetRanges) {
ZkUtils.updatePersistentPath(zkClient, zKGroupTopicDirs.consumerOffsetDir + "/" + {
o.partition
}, o.fromOffset.toString)
println("本次消息消费成功后,偏移量状态:" + o)
}
})

ssc.start()
ssc.awaitTermination()

}
}



举报

相关推荐

0 条评论