0
点赞
收藏
分享

微信扫一扫

flink table 读取kafka key

飞进科技 2022-01-14 阅读 114

flink table 默认是读取kafka key,需要使用.option(“value.fields-include”, “EXCEPT_KEY”),完整示意代码如下:

public class KafkaKeyAndValueTable {

	public static void main(String[] args) {
		EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
		TableEnvironment tableEnv = TableEnvironment.create(settings);
//		 
//		String name = "myhive";
//		String defaultDatabase = "default";
//		String hiveConfDir = "D:\\temp\\hive\\conf";
//
//		HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//		tableEnv.registerCatalog("myhive", hive);
//
//		// set the HiveCatalog as the current catalog of the session
//		tableEnv.useCatalog("myhive");


		Builder schemaBuilder = Schema.newBuilder();
		schemaBuilder.column("Kafkakey", DataTypes.STRING())
		.column("valueKey",DataTypes.INT() )
		.column("value",DataTypes.STRING() )
		.columnByMetadata("ts", DataTypes.TIMESTAMP(), "timestamp");
		
		//跨Connector 操作不能使用dialect
		tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
		//CsvOutputFormat.initDefaultsFromConfiguration(settings.toConfiguration());
		//pom需要导入csv format dependency
 
				   
		TableDescriptor tableScriptor = TableDescriptor.forConnector("kafka")
		   .schema(schemaBuilder.build() )
		   .option("topic", "versioned_behavior")
		   .option("properties.bootstrap.servers", "192.168.157.140:9092")
		   .option("properties.group.id", "testGroup")
		   .option("scan.startup.mode", "earliest-offset")
 		   .option("key.fields", "Kafkakey") 
 		   .option("key.format", "csv")
 		  // .option("key.fields-prefix", "csv")
		   .option("format", "csv")
		   .option("value.fields-include", "EXCEPT_KEY") //要设置此option才能读取 kafka key
		   .build();
	//	createTable 和createTemporaryTable 都可以 工作,但createTable 会在hive 创建一个不可用表?
		 try{
			 //Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not execute CreateTable in path `myhive`.`default`.`FileTable`
			 tableEnv.createTemporaryTable("KafkaVersionTable", tableScriptor);
			 Table table = tableEnv.sqlQuery("select *  from KafkaVersionTable");
				// .fetch(2);
				// 获取数据
				TableResult result = table.execute();
				result.print();
		 }catch(Exception e) {
			
			 e.printStackTrace();
		 }
		 
		 
	 
	}

}
举报

相关推荐

0 条评论