import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object TipsRurnPay2ToTidb {
def main(args: Array[String]) {
var part = ""
val spark = SparkSession.builder
.appName("TipsRurnPay2ToTidb")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "10000")
.config("hive.new.job.grouping.set.cardinality", "64")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.debug.maxToStringFields", "1024")
.enableHiveSupport()
.getOrCreate()
val schema = StructType(List(
StructField("user_id", IntegerType, true),
StructField("story_id", IntegerType, true),
StructField("voice_type", IntegerType, true)
))
for (i <- 0 until 10) {
part = i.toString
val TABLE_NAME = s"xx_story_user_listen_record${part}" -> "用户收听小知识记录"
val FULL_SQL =
s"""
|select
| cast(user_id as int) as user_id
| ,story_id
| ,voice_type
|from ads.ads_xx_tips_rurn_pay_user_listen_story_finish_a_d
|where user_part = '${part}'
|""".stripMargin
println("待写入表名:" + TABLE_NAME._1 + ",待写入数据量", spark.sql(FULL_SQL).count())
try {
val df: DataFrame = spark.sql(FULL_SQL)
df.toJSON
.foreachPartition {
tipsJsonItr => {
var bean: tipsBean = null
val conn = DriverManager.getConnection("jdbc:mysql://tidb.dev.xxx.com:4000/xx_content_assets", "root", "123456")
conn.setAutoCommit(false)
val sql = "REPLACE INTO xx_story_user_listen_record (user_id, story_id, voice_type) VALUES (?, ?, ?)"
val pstat: PreparedStatement = conn.prepareStatement(sql)
var i = 0
tipsJsonItr.foreach(
json => {
bean = JSON.parseObject(json, classOf[tipsBean])
pstat.setObject(1, bean.user_id)
pstat.setObject(2, bean.story_id)
pstat.setObject(3, bean.voice_type)
pstat.addBatch()
i = i + 1
pstat.addBatch()
if (i >= 1500) {
pstat.executeBatch()
conn.commit()
i = 0
}
}
)
if (i > 0 && i < 2000) {
pstat.executeBatch()
conn.commit()
}
conn.close()
}
}
println(s"${TABLE_NAME._1}" + " 数据写入成功")
}
catch {
case e: Exception => println(e)
}
}
}
}
case class tipsBean(
user_id: Int,
story_id: Int,
voice_type: Int
)