需要的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--
发送和接收的JSON格式需要此依赖,可以改为flink-avro/csv
不能使用<scope>provided</scope>
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val kafkaSourceSql =
"""
|CREATE TABLE kafkaTable (
| json STRING
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'flink',
| 'properties.bootstrap.servers' = '124.220.2.188:9092',
| 'properties.group.id' = 'testGroup',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tableEnv.executeSql(kafkaSourceSql)
tableEnv.sqlQuery("select * from kafkaTable").execute().print()
env.execute()
kafka输入测试(根据字段)
{"json":"message"}
根据建表语句
- 如果输入数据为json格式,没有"json"字段,返回空值;
- 如果输入数据为json格式,包含json字段,只会返回json字段
- 不是json格式数据,不会解析
报错Could not find any factory for identifier ‘json’ that implements ‘org.apache.flink.table.factories.DeserializationFormatFactory’ in the classpath.
因为没有flink-json或flink-avro…依赖,或者依赖没有加入到classpath(使用provided)
Available factory identifiers are: 可以使用一下格式
raw
使用raw格式的建表语句
CREATE TABLE nginx_log (
log STRING
) WITH (
'connector' = 'kafka',
'topic' = 'nginx_log',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'raw'
)
官网链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/.