0
点赞
收藏
分享

微信扫一扫

客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

独孤凌雪 2022-03-12 阅读 65

目录

将消费的kafka数据转换成bean对象

一、将OGG数据转换成bean对象

二、​​​​​​​将Canal数据转换成bean对象

三、完整代码


将消费的kafka数据转换成bean对象

一、​​​​​​​将OGG数据转换成bean对象

实现步骤:

  • 消费kafka的 logistics Topic数据
  • 将消费到的数据转换成OggMessageBean对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 logistics Topic数据
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
  • 将消费到的数据转换成OggMessageBean对象
    • 默认情况下表名带有数据库名,因此需要删除掉数据库名
//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
  iters.map(row => {
    //获取到value列的值(字符串)
    val jsonStr: String = row.getAs[String](0)
    //将字符串转换成javabean对象
    JSON.parseObject(jsonStr, classOf[OggMessageBean])
  }).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))
  • 递交作业启动运行
// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("logistics").start()

 

二、​​​​​​​将Canal数据转换成bean对象

实现步骤:

  • 消费kafka的 crm Topic数据
  • 将消费到的数据转换成 CanalMessageBean 对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 crm Topic数据
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
  • 将消费到的数据转换成CanalMessageBean 对象
//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
  //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
  iters.filter(row=>{
    //取到value列的数据
    val line: String = row.getAs[String](0)
    //如果value列的值不为空,且是清空表的操作
    if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
  }).map(row=>{
    //取到value列的数据
    val jsonStr: String = row.getAs[String](0)
    //将json字符串转换成javaBean对象
    JSON.parseObject(jsonStr, classOf[CanalMessageBean])
  }).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))
  • 递交作业启动运行
crmDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("crm").start()

三、完整代码

package cn.it.logistics.etl.realtime
import java.sql.Connection

import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.it.logistics.etl.parser.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * 实现KUDU数据库的实时ETL操作
 */
object KuduStreamApp2 extends StreamApp {

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(this.getClass.getSimpleName)
    )

    //数据处理
    execute(sparkConf)
  }

  /**
   * 数据的处理
   *
   * @param sparkConf
   */
  override def execute(sparkConf: SparkConf): Unit = {
    /**
     * 实现步骤:
     * 1)创建sparksession对象
     * 2)获取数据源(获取物流相关数据以及crm相关数据)
     * 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
     * 4)抽取每条数据的字段信息
     * 5)将过滤出来的每张表写入到kudu数据库
     */
    //1)创建sparksession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //2)获取数据源(获取物流相关数据以及crm相关数据)
    //2.1:获取物流系统相关的数据
    val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

    //2.2:获取客户关系系统相关的数据
    val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

    //导入隐式转换
    import  sparkSession.implicits._

    //导入自定义的POJO的隐士转换
    import  cn.itcast.logistics.common.BeanImplicit._

    //3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
    //3.1:物流相关数据的转换
    val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
      iters.map(row => {
        //获取到value列的值(字符串)
        val jsonStr: String = row.getAs[String](0)
        //将字符串转换成javabean对象
        JSON.parseObject(jsonStr, classOf[OggMessageBean])
      }).toList.iterator
    })(Encoders.bean(classOf[OggMessageBean]))

    //3.2:客户关系相关数据的转换
    val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
      //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
      iters.filter(row=>{
        //取到value列的数据
        val line: String = row.getAs[String](0)
        //如果value列的值不为空,且是清空表的操作
        if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
      }).map(row=>{
        //取到value列的数据
        val jsonStr: String = row.getAs[String](0)
        //将json字符串转换成javaBean对象
        JSON.parseObject(jsonStr, classOf[CanalMessageBean])
      }).toList.toIterator
    })(Encoders.bean(classOf[CanalMessageBean]))

    //输出数据
    /**
     * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
     * |               after|              before|          current_ts|               op_ts|op_type|                 pos|              table|
     * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
     * |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...|      U|00000000200006647808|tbl_collect_package|
     * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
     */
    logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()

    /**
     * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
     * |                data|database|  ddl|           es| id|           mysqlType|               old|sql|             sqlType|      table|           ts|  type|
     * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
     * |[[cdt -> [], gis_...|     crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]|   |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
     * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
     */
    crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
  }

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
举报

相关推荐

0 条评论