用dbeaver工具连接clickhouse数据库
建表语句:
CREATE TABLE kafka_clickhouse_os.ldy_pageclick
(
`id` String COMMENT '***'ID',
`ldy_type` String COMMENT '***'类型',
`ldy_platform` String COMMENT '***',
`ldy_id` Int64 COMMENT '***'Id',
`app_id` String COMMENT '***'Id',
`ip` String COMMENT 'ip***'',
`ua` String COMMENT '用户***'',
`click_time` DateTime COMMENT '***'时间',
`channel_id` Int64 COMMENT '***'ID',
`channel_name` String COMMENT '***'名字',
`book_id` Int64 COMMENT '***'ID',
`user_tag` String COMMENT '***'ID',
`book_name` String COMMENT '***'名'
)
ENGINE = MergeTree
ORDER BY ldy_id
SETTINGS index_granularity = 8192
测试类:
import com.zw.bigdata.yd.land.dwd.AdAndLandStat.jobName
import com.zw.bigdata.yd.land.kafka.AdConsumerKafkaSchema
import com.zw.bigdata.yd.land.pojo.{LandPvUvRecord, UniformAdvertRecord}
import org.apache.flink.api.scala._
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object AdLandToCH {
//从想应的配置文件中获取相应的配置参数
val flinkCheckpointPath = EnvConfig.getConfigValue("yd.land.flink.checkout.path")
val kafkaServer = EnvConfig.getConfigValue("yd.land.kafka.bootstrap.servers")
val kafkaGroupId = EnvConfig.getConfigValue("yd.land.kafka.group.id")
val topicAdName = EnvConfig.getConfigValue("yd.land.dws.ad.kafka.topic.name")
val topicPvUvName = EnvConfig.getConfigValue("yd.land.dws.puv.kafka.topic.name")
val jobName = EnvConfig.getConfigValue("yd.land.job.name")
val offsetPosition = EnvConfig.getConfigValue("yd.land.offset.position")
val landTableName = EnvConfig.getConfigValue("yd.land.clickhouse.tablename")
val adTableName = EnvConfig.getConfigValue("yd.ad.clickhouse.tablename")
val driverName = EnvConfig.getConfigValue("yd.clickhouse.driverName")
val dataBase = EnvConfig.getConfigValue("yd.clickhouse.databse")
val clickhouseIp = EnvConfig.getConfigValue("yd.clickhouse.Ip")
val userName = EnvConfig.getConfigValue("yd.clickhouse.userName")
val password = EnvConfig.getConfigValue("yd.clickhouse.password")
val timeoutKafka = 15 * 60 * 1000
val ODS_YD_PREFIX = "dwd_yd_land"
def main(args: Array[String]): Unit = {
val environment = FlinkExecutionEnvUtil.getStreamEnv(flinkCheckpointPath)
landPvUvToCkSink(environment)
environment.execute(jobName)
}
def getStreamEnv(checkpointPath:String) = {
val checkpointTime=1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//进行模式匹配,加载相配的配置文件
envActivate match {
case "prod" =>{
env.enableCheckpointing(600000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
env.getCheckpointConfig.setCheckpointTimeout(500000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath))
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(3)))
}
case "test" =>{
env.enableCheckpointing(1200000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
env.getCheckpointConfig.setCheckpointTimeout(1000000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(5)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath))
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.minutes(1)))
}
case _=>{
env.enableCheckpointing(60000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
env.getCheckpointConfig.setCheckpointTimeout(50000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath))
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.minutes(1)))
}
}
env
}
def landPvUvToCkSink(env:StreamExecutionEnvironment):Unit = {
val landPuvConsumer = FlinkKafkaUtil.getFlinkKafkaConsume[LandPvUvRecord](kafkaServer,kafkaGroupId,topicPvUvName,offsetPosition,
new AdConsumerKafkaSchema[LandPvUvRecord])
landPuvConsumer.setStartFromEarliest()
val ydLandPuvDS = env.addSource(landPuvConsumer)
val landAdvertSink: SinkFunction[LandPvUvRecord] = JdbcSink.sink(
s"""insert into ${dataBase}.${landTableName} (`landDate`,`landId`,`landName`,`landPosition`,`pv`,`uv`,`appName`,`appWx`,`bookId`,`channelBookId`,`bookName`) values(?,?,?,?,?,?,?,?,?,?,?)""",
new JdbcStatementBuilder[LandPvUvRecord] {
override def accept(ps: PreparedStatement, tp: LandPvUvRecord): Unit = {
ps.setString(1, tp.landDate)
ps.setString(2, tp.landId)
ps.setString(3, tp.landName)
ps.setString(4, tp.landPosition)
ps.setInt(5, tp.pv)
ps.setInt(6, tp.uv)
ps.setString(7, tp.appName.getOrElse(""))
ps.setString(8, tp.appWx.getOrElse(""))
ps.setString(9, tp.bookId.getOrElse(""))
ps.setString(10, tp.channelBookId.getOrElse(""))
ps.setString(11, tp.bookName.getOrElse(""))
}
},
new JdbcExecutionOptions.Builder().withBatchSize(2).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(s"${driverName}")
.withUrl(s"jdbc:clickhouse://${clickhouseIp}:8123/${dataBase}?useUnicode=true&characterEncoding=utf-8&useSSL=false")
.withUsername(s"${userName}")
.withPassword(s"${password}")
.build()
)
ydLandPuvDS.print()
ydLandPuvDS.addSink(landAdvertSink)
}
}
加载配置文件的图片:
样例类:
LandPvUvRecord:
package com.xx.bigdata.yd.land.pojo
case class LandPvUvRecord(
landDate:String,
landId:String,
appName:Option[String],
appWx:Option[String],
bookId:Option[String],
channelBookId:Option[String],
bookName:Option[String],
landName:String,
landPosition:String,pv:Int,uv:Int)