0
点赞
收藏
分享

微信扫一扫

flink sql 1.14 使用sql创建Kakfa动态表

查拉图斯特拉你和他 2022-04-24 阅读 41
flink

需要的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- 
	发送和接收的JSON格式需要此依赖,可以改为flink-avro/csv 
	不能使用<scope>provided</scope>
-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val kafkaSourceSql =
    """
      |CREATE TABLE kafkaTable (
      | json STRING
      |) WITH (
      |  'connector' = 'kafka',
      |  'topic' = 'flink',
      |  'properties.bootstrap.servers' = '124.220.2.188:9092',
      |  'properties.group.id' = 'testGroup',
      |  'scan.startup.mode' = 'earliest-offset',
      |  'format' = 'json',
      |  'json.fail-on-missing-field' = 'false',
      |  'json.ignore-parse-errors' = 'true'
      |)
      |""".stripMargin
tableEnv.executeSql(kafkaSourceSql)
tableEnv.sqlQuery("select * from kafkaTable").execute().print()
env.execute()

kafka输入测试(根据字段)

{"json":"message"}

根据建表语句

  1. 如果输入数据为json格式,没有"json"字段,返回空值
  2. 如果输入数据为json格式,包含json字段,只会返回json字段
  3. 不是json格式数据,不会解析
    flinkTable 打印结果

报错Could not find any factory for identifier ‘json’ that implements ‘org.apache.flink.table.factories.DeserializationFormatFactory’ in the classpath.

因为没有flink-json或flink-avro…依赖,或者依赖没有加入到classpath(使用provided)
Available factory identifiers are: 可以使用一下格式
raw

使用raw格式的建表语句

CREATE TABLE nginx_log (
  log STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'nginx_log',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'raw'
)

在这里插入图片描述

官网链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/.

举报

相关推荐

0 条评论