upsert kafka几点差异
1、必须使用 .option(“value.format”, “json”)
2、不能使用 .option(“scan.startup.mode”, “earliest-offset”)
3、可以通过row.getKind() 获取需要的数据,比如只获取after-update的数据
4、kafka key中会保留pk信息
PS发现
- flink insert 的字段可以少于select 字段,但select 字段必须等于table定义的字段
- selectQuery如果sql where有问题:如字段不存在情况,可以执行但无输出
完整测试代码如下:
public class KafkaVersionTable {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//使用asSerializableString定义not null column
Builder schemaBuilder = Schema.newBuilder();
schemaBuilder
//.column("Kafkakey", new VarCharType(false, 20).asSerializableString() )
.column("valueKey",new IntType(false).asSerializableString())
.column("value",DataTypes.STRING() )
.columnByMetadata("ts", DataTypes.TIMESTAMP(3), "timestamp")
//.primaryKey("valueKey")
.watermark("ts", "ts - INTERVAL '5' SECOND");
TableDescriptor tableScriptor = TableDescriptor.forConnector("kafka")
.schema(schemaBuilder.build() )
.option("topic", "input_topic")
.option("properties.bootstrap.servers", "192.168.157.140:9092")
.option("properties.group.id", "testGroup")
.option("scan.startup.mode", "earliest-offset")
// .option("key.fields", "Kafkakey") //不是kafka key?
// .option("key.format", "debezium-json")
// .option("key.fields-prefix", "csv")
.option("format", "csv")
// .option("value.fields-include", "EXCEPT_KEY") //要设置此option才能读取 kafka key
.build();
// createTable 和createTemporaryTable 都可以 工作,但createTable 会在hive 创建一个不可用表?
Builder versionSchemaBuilder = Schema.newBuilder();
versionSchemaBuilder
//.column("Kafkakey", new VarCharType(false, 20).asSerializableString() )
.column("valueKey",new IntType(false).asSerializableString())
.column("countNumber",DataTypes.BIGINT() )
.column("ts", DataTypes.TIMESTAMP(3))
// .columnByMetadata("ts", DataTypes.TIMESTAMP(3), "timestamp")
// .watermark("ts", "ts - INTERVAL '5' SECOND")
.primaryKey("valueKey");
//Caused by: org.apache.flink.table.api.ValidationException: Could not find required sink format 'value.format'.
TableDescriptor versionTableScriptor = TableDescriptor.forConnector("upsert-kafka")
.schema(versionSchemaBuilder.build() )
.option("topic", "versioned_topic")
.option("properties.bootstrap.servers", "192.168.157.140:9092")
.option("properties.group.id", "testGroup")
// .option("scan.startup.mode", "earliest-offset")
.option("key.format", "json")
// .option("key.fields-prefix", "csv")
.option("value.format", "json")
// .option("value.fields-include", "EXCEPT_KEY") //要设置此option才能读取 kafka key
.build();
try{
//Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not execute CreateTable in path `myhive`.`default`.`FileTable`
tableEnv.createTemporaryTable("KafkaTable", tableScriptor);
tableEnv.createTemporaryTable("KafkaVersionTable", versionTableScriptor);
// insert 的字段可以少于select 字段,但select 字段必须等于table定义的字段
// Kafka 的key 会保存primaryKey信息,insert 后单条message 信息如下
// -- KEY {"valueKey":19}
// -- VALUE {"valueKey":19,"countNumber":5,"ts":"2022-01-13 14:05:19.07"}
tableEnv.executeSql("INSERT INTO KafkaVersionTable(valueKey,countNumber,ts)"
+ " SELECT valueKey,count(*) ,CURRENT_TIMESTAMP AS countNumber "
+ " from KafkaTable group by valueKey");
// tableEnv.executeSql(" insert into KafkaVersionTable VALUES (1,100)");
//sql 错误会不输出结果 where op='+U'
Table table = tableEnv.sqlQuery("select * from KafkaVersionTable ");
//Table table = tableEnv.sqlQuery("select * from KafkaVersionTable where Row.ofKind='UPDATE_AFTER' ");
// .fetch(2);
// 获取数据 并只输出更新后数据
TableResult result = table.execute();
table.execute().collect().forEachRemaining(row ->{
if (row.getKind().equals(RowKind.UPDATE_AFTER)) {
System.out.println(row.toString());
};
});
// row.getkind 为UPDATE_BEFORE 及UPDATE_AFTER
// String s=result.getResolvedSchema().toString();
// System.out.println(s);
// result.print();
}catch(Exception e) {
e.printStackTrace();
}
}
}