flink table 默认是读取kafka key,需要使用.option(“value.fields-include”, “EXCEPT_KEY”),完整示意代码如下:
public class KafkaKeyAndValueTable {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//
// String name = "myhive";
// String defaultDatabase = "default";
// String hiveConfDir = "D:\\temp\\hive\\conf";
//
// HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
// tableEnv.registerCatalog("myhive", hive);
//
// // set the HiveCatalog as the current catalog of the session
// tableEnv.useCatalog("myhive");
Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("Kafkakey", DataTypes.STRING())
.column("valueKey",DataTypes.INT() )
.column("value",DataTypes.STRING() )
.columnByMetadata("ts", DataTypes.TIMESTAMP(), "timestamp");
//跨Connector 操作不能使用dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
//CsvOutputFormat.initDefaultsFromConfiguration(settings.toConfiguration());
//pom需要导入csv format dependency
TableDescriptor tableScriptor = TableDescriptor.forConnector("kafka")
.schema(schemaBuilder.build() )
.option("topic", "versioned_behavior")
.option("properties.bootstrap.servers", "192.168.157.140:9092")
.option("properties.group.id", "testGroup")
.option("scan.startup.mode", "earliest-offset")
.option("key.fields", "Kafkakey")
.option("key.format", "csv")
// .option("key.fields-prefix", "csv")
.option("format", "csv")
.option("value.fields-include", "EXCEPT_KEY") //要设置此option才能读取 kafka key
.build();
// createTable 和createTemporaryTable 都可以 工作,但createTable 会在hive 创建一个不可用表?
try{
//Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not execute CreateTable in path `myhive`.`default`.`FileTable`
tableEnv.createTemporaryTable("KafkaVersionTable", tableScriptor);
Table table = tableEnv.sqlQuery("select * from KafkaVersionTable");
// .fetch(2);
// 获取数据
TableResult result = table.execute();
result.print();
}catch(Exception e) {
e.printStackTrace();
}
}
}