0
点赞
收藏
分享

微信扫一扫

操作clickhouse数据库

墨春 2022-02-15 阅读 44

用dbeaver工具连接clickhouse数据库
建表语句:

CREATE TABLE kafka_clickhouse_os.ldy_pageclick
(
`id` String COMMENT '***'ID',
`ldy_type` String COMMENT '***'类型',
`ldy_platform` String COMMENT '***',
`ldy_id` Int64 COMMENT '***'Id',
`app_id` String COMMENT '***'Id',
`ip` String COMMENT 'ip***'',
`ua` String COMMENT '用户***'',
`click_time` DateTime COMMENT '***'时间',
`channel_id` Int64 COMMENT '***'ID',
`channel_name` String COMMENT '***'名字',
`book_id` Int64 COMMENT '***'ID',
`user_tag` String COMMENT '***'ID',
`book_name` String COMMENT '***'名'
)
ENGINE = MergeTree
ORDER BY ldy_id
SETTINGS index_granularity = 8192

测试类:

import com.zw.bigdata.yd.land.dwd.AdAndLandStat.jobName
import com.zw.bigdata.yd.land.kafka.AdConsumerKafkaSchema
import com.zw.bigdata.yd.land.pojo.{LandPvUvRecord, UniformAdvertRecord}
import org.apache.flink.api.scala._
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object AdLandToCH {
//从想应的配置文件中获取相应的配置参数
val flinkCheckpointPath = EnvConfig.getConfigValue("yd.land.flink.checkout.path")

val kafkaServer = EnvConfig.getConfigValue("yd.land.kafka.bootstrap.servers")

val kafkaGroupId = EnvConfig.getConfigValue("yd.land.kafka.group.id")


val topicAdName = EnvConfig.getConfigValue("yd.land.dws.ad.kafka.topic.name")
val topicPvUvName = EnvConfig.getConfigValue("yd.land.dws.puv.kafka.topic.name")

val jobName = EnvConfig.getConfigValue("yd.land.job.name")

val offsetPosition = EnvConfig.getConfigValue("yd.land.offset.position")


val landTableName = EnvConfig.getConfigValue("yd.land.clickhouse.tablename")

val adTableName = EnvConfig.getConfigValue("yd.ad.clickhouse.tablename")

val driverName = EnvConfig.getConfigValue("yd.clickhouse.driverName")

val dataBase = EnvConfig.getConfigValue("yd.clickhouse.databse")

val clickhouseIp = EnvConfig.getConfigValue("yd.clickhouse.Ip")

val userName = EnvConfig.getConfigValue("yd.clickhouse.userName")

val password = EnvConfig.getConfigValue("yd.clickhouse.password")

val timeoutKafka = 15 * 60 * 1000

val ODS_YD_PREFIX = "dwd_yd_land"
   def main(args: Array[String]): Unit = {
    val environment = FlinkExecutionEnvUtil.getStreamEnv(flinkCheckpointPath)
    landPvUvToCkSink(environment)
    environment.execute(jobName)

   }
   def getStreamEnv(checkpointPath:String) = {



val checkpointTime=1

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)


//进行模式匹配,加载相配的配置文件
envActivate match {

  case "prod" =>{
    env.enableCheckpointing(600000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    env.getCheckpointConfig.setCheckpointTimeout(500000)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
    env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(3)))

  }
  
  case "test" =>{

    env.enableCheckpointing(1200000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    env.getCheckpointConfig.setCheckpointTimeout(1000000)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(5)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
    env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath))
    //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.minutes(1)))

  }
  
  case _=>{
  
    env.enableCheckpointing(60000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    env.getCheckpointConfig.setCheckpointTimeout(50000)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
    env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.minutes(1)))
  }
}

env

  }



def landPvUvToCkSink(env:StreamExecutionEnvironment):Unit = {

val landPuvConsumer = FlinkKafkaUtil.getFlinkKafkaConsume[LandPvUvRecord](kafkaServer,kafkaGroupId,topicPvUvName,offsetPosition,
  new AdConsumerKafkaSchema[LandPvUvRecord])
landPuvConsumer.setStartFromEarliest()

val ydLandPuvDS = env.addSource(landPuvConsumer)


val landAdvertSink: SinkFunction[LandPvUvRecord] = JdbcSink.sink(
  s"""insert into ${dataBase}.${landTableName} (`landDate`,`landId`,`landName`,`landPosition`,`pv`,`uv`,`appName`,`appWx`,`bookId`,`channelBookId`,`bookName`) values(?,?,?,?,?,?,?,?,?,?,?)""",
  new JdbcStatementBuilder[LandPvUvRecord] {
    override def accept(ps: PreparedStatement, tp: LandPvUvRecord): Unit = {
      ps.setString(1, tp.landDate)
      ps.setString(2, tp.landId)
      ps.setString(3, tp.landName)
      ps.setString(4, tp.landPosition)
      ps.setInt(5, tp.pv)
      ps.setInt(6, tp.uv)
      ps.setString(7, tp.appName.getOrElse(""))
      ps.setString(8, tp.appWx.getOrElse(""))
      ps.setString(9, tp.bookId.getOrElse(""))
      ps.setString(10, tp.channelBookId.getOrElse(""))
      ps.setString(11, tp.bookName.getOrElse(""))
    }
  },
  new JdbcExecutionOptions.Builder().withBatchSize(2).build(),
  new  JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withDriverName(s"${driverName}")
    .withUrl(s"jdbc:clickhouse://${clickhouseIp}:8123/${dataBase}?useUnicode=true&characterEncoding=utf-8&useSSL=false")
    .withUsername(s"${userName}")
    .withPassword(s"${password}")
    .build()
)


ydLandPuvDS.print()
ydLandPuvDS.addSink(landAdvertSink)
  }      
}

加载配置文件的图片:
在这里插入图片描述
样例类:
LandPvUvRecord:

package com.xx.bigdata.yd.land.pojo

case class LandPvUvRecord(
landDate:String,
landId:String,
appName:Option[String], 
appWx:Option[String], 
bookId:Option[String],
channelBookId:Option[String], 
bookName:Option[String], 
landName:String, 
landPosition:String,pv:Int,uv:Int)
举报

相关推荐

0 条评论