0
点赞
收藏
分享

微信扫一扫

spark stream 3.0.0 scala版本写入kafka消息数据

是她丫 2022-02-18 阅读 59

这里实际上是调用kafka客户端来执行kafka消息数据写入的。这里模拟随机产生一系列数据,持续写入kafka,形成持续的消息流数据。

1. 添加依赖

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.1.0</version>
        </dependency>
</dependencies>

2. 测试代码

package com.demo

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object MockerRealTime {

  /**
   * 模拟的数据
   *
   * 格式 :timestamp area city userid adid
   * 某个时间点 某个地区 某个城市 某个用户 某个广告
   */
  def generateMockData(): Array[String] = {
    val array: ArrayBuffer[String] = ArrayBuffer[String]()
    val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
      RanOpt(CityInfo(2, "上海", "华东"), 30),
      RanOpt(CityInfo(3, "广州", "华南"), 10),
      RanOpt(CityInfo(4, "深圳", "华南"), 20),
      RanOpt(CityInfo(5, "天津", "华北"), 10))
    val random = new Random()
    // 模拟实时数据:
    // timestamp province city userid adid
    for (i <- 0 to 50) {
      val timestamp: Long = System.currentTimeMillis()
      val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
      val city: String = cityInfo.city_name
      val area: String = cityInfo.area
      val adid: Int = 1 + random.nextInt(6)
      val userid: Int = 1 + random.nextInt(6)
      // 拼接实时数据
      array += timestamp + " " + area + " " + city + " " + userid + " " + adid
    }
    array.toArray
  }

  def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
    // 创建配置对象
    val prop = new Properties()
    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    // 根据配置创建 Kafka 生产者
    new KafkaProducer[String, String](prop)
  }

  def main(args: Array[String]): Unit = {
    // 获取配置文件 config.properties 中的 Kafka 配置参数
    val config: Properties = PropertiesUtil.load("config.properties")
    val broker: String = config.getProperty("kafka.broker.list")
    val topic = "test"
    // 创建 Kafka 消费者
    val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
    while (true) {
      // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
      for (line <- generateMockData()) {
        kafkaProducer.send(new ProducerRecord[String, String](topic, line))
        println(line)
      }
      Thread.sleep(2000)
    }
  }

}

kafka生产者参数配置主要由createKafkaProducer完成。

主要的配置内容时kafka的ip地址,端口号,topic以及key和value的序列化。

3. kafka配置(config.properties)

# Kafka 配置
kafka.broker.list=192.168.22.56:9092

4. 辅助代码(PropertiesUtil.scala)

package com.demo

import java.io.InputStreamReader
import java.util.Properties


object PropertiesUtil {

  def load(propertiesName:String): Properties ={
    val prop=new Properties()
    prop.load(new
        InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))
    prop
  }

}

5. 辅助代码(RandomOptions.scala)

package com.demo

import scala.collection.mutable.ListBuffer
import scala.util.Random

case class RanOpt[T](value: T, weight: Int)


object RandomOptions {

  def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
    val randomOptions = new RandomOptions[T]()
    for (opt <- opts) {
      randomOptions.totalWeight += opt.weight
      for (i <- 1 to opt.weight) {
        randomOptions.optsBuffer += opt.value
      }
    }
    randomOptions
  }

}

class RandomOptions[T](opts: RanOpt[T]*) {
  var totalWeight = 0
  var optsBuffer = new ListBuffer[T]

  def getRandomOpt: T = {
    val randomNum: Int = new Random().nextInt(totalWeight)
    optsBuffer(randomNum)
  }
}

6. 辅助代码(CityInfo.scala)

package com.demo

/**
 *
 * 城市信息表
 *
 * @param city_id 城市 id
 * @param city_name 城市名称
 * @param area 城市所在大区
 */
case class CityInfo (city_id:Long,
                     city_name:String,
                     area:String)

7. 执行程序测试

可以同时看到idea控制台和kafka的命令行消费者输出。

1645151518980 华南 深圳 6 6
1645151518980 华南 深圳 2 3
1645151518980 华南 深圳 4 6
1645151518980 华东 上海 3 6
1645151518980 华北 北京 2 4
1645151518980 华东 上海 6 2
1645151518980 华北 北京 2 1

kafka消息输出。

 

举报

相关推荐

0 条评论