0
点赞
收藏
分享

微信扫一扫

spark程序将hive表亿条数据写入tidb

RIOChing 2022-03-12 阅读 59
import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 *  创建人:  xiaotao
 *  创建日期: Created on  2022/03/09
 *    数据开发功能描述:
 *     需求方: 158741600条数据
 *      只跑一次,后续数据由服务端做
 *
 *     数据流向:hiveToTidb
 *
 */
object TipsRurnPay2ToTidb {
  def main(args: Array[String]) {

    var part = ""

    val spark = SparkSession.builder
      .appName("TipsRurnPay2ToTidb")
      //.master("local[*]")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("hive.exec.max.dynamic.partitions", "10000")
      .config("hive.new.job.grouping.set.cardinality", "64")
      .config("spark.sql.crossJoin.enabled", "true")
      .config("spark.debug.maxToStringFields", "1024")
      .enableHiveSupport()
      .getOrCreate()

    //动态创建schema
    val schema = StructType(List(
      StructField("user_id", IntegerType, true),
      StructField("story_id", IntegerType, true),
      StructField("voice_type", IntegerType, true)
    ))


    for (i <- 0 until 10) {
      part = i.toString
      val TABLE_NAME = s"xx_story_user_listen_record${part}" -> "用户收听小知识记录"

      val FULL_SQL =
        s"""
           |select
           |       cast(user_id as int) as user_id
           |      ,story_id
           |      ,voice_type
           |from ads.ads_xx_tips_rurn_pay_user_listen_story_finish_a_d
           |where user_part = '${part}'
           |""".stripMargin
      println("待写入表名:" + TABLE_NAME._1 + ",待写入数据量", spark.sql(FULL_SQL).count())

      try {
        val df: DataFrame = spark.sql(FULL_SQL)
        df.toJSON
          .foreachPartition {
            tipsJsonItr => {
              var bean: tipsBean = null
              val conn = DriverManager.getConnection("jdbc:mysql://tidb.dev.xxx.com:4000/xx_content_assets", "root", "123456")
              conn.setAutoCommit(false) //不自动提交事务
              val sql = "REPLACE INTO xx_story_user_listen_record (user_id, story_id, voice_type) VALUES (?, ?, ?)"
              val pstat: PreparedStatement = conn.prepareStatement(sql)
              var i = 0

              tipsJsonItr.foreach(
                json => {
                  bean = JSON.parseObject(json, classOf[tipsBean]) // 转为javaBean
                  //dataList.add(Row(bean.user_id, bean.story_id, bean.voice_type))
                  pstat.setObject(1, bean.user_id)
                  pstat.setObject(2, bean.story_id)
                  pstat.setObject(3, bean.voice_type)
                  pstat.addBatch()

                  i = i + 1
                  //批量更新
                  pstat.addBatch()

       
                  if (i >= 1500) {
                    pstat.executeBatch()
                    conn.commit()

                    i = 0
                   // Thread.sleep(2000)

                  }
                }
              )

              if (i > 0 && i < 2000) {
                pstat.executeBatch()
                conn.commit()
              }
              conn.close()
            }
          }
        println(s"${TABLE_NAME._1}" + "  数据写入成功")

      }
      catch {
        case e: Exception => println(e)
      }

    }
  }
}

case class tipsBean(
                     user_id: Int,
                     story_id: Int,
                     voice_type: Int
                     )
举报

相关推荐

0 条评论