0
点赞
收藏
分享

微信扫一扫

flink upsert kafka 及versioned table 测试

船长_Kevin 2022-01-14 阅读 53

upsert kafka几点差异
1、必须使用 .option(“value.format”, “json”)
2、不能使用 .option(“scan.startup.mode”, “earliest-offset”)
3、可以通过row.getKind() 获取需要的数据,比如只获取after-update的数据
4、kafka key中会保留pk信息

PS发现

  1. flink insert 的字段可以少于select 字段,但select 字段必须等于table定义的字段
  2. selectQuery如果sql where有问题:如字段不存在情况,可以执行但无输出

完整测试代码如下:

public class KafkaVersionTable {

	public static void main(String[] args) {
		EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
		TableEnvironment tableEnv = TableEnvironment.create(settings);

		//使用asSerializableString定义not null column
				Builder schemaBuilder = Schema.newBuilder();
				schemaBuilder
				//.column("Kafkakey", new VarCharType(false, 20).asSerializableString()  )
				.column("valueKey",new IntType(false).asSerializableString())
				.column("value",DataTypes.STRING() )
				.columnByMetadata("ts", DataTypes.TIMESTAMP(3), "timestamp")
				//.primaryKey("valueKey")
				.watermark("ts", "ts - INTERVAL '5' SECOND");
		
	 
 
				   
		TableDescriptor tableScriptor = TableDescriptor.forConnector("kafka")
		   .schema(schemaBuilder.build() )
		   .option("topic", "input_topic")
		   .option("properties.bootstrap.servers", "192.168.157.140:9092")
		   .option("properties.group.id", "testGroup")
		   .option("scan.startup.mode", "earliest-offset")
 		 //  .option("key.fields", "Kafkakey") //不是kafka key?
 		 //  .option("key.format", "debezium-json")
 		  // .option("key.fields-prefix", "csv")
		   .option("format", "csv")
				  // .option("value.fields-include", "EXCEPT_KEY") //要设置此option才能读取 kafka key
		   .build();
	//	createTable 和createTemporaryTable 都可以 工作,但createTable 会在hive 创建一个不可用表?
		
		
		
		Builder versionSchemaBuilder = Schema.newBuilder();
		versionSchemaBuilder
		//.column("Kafkakey", new VarCharType(false, 20).asSerializableString()  )
		.column("valueKey",new IntType(false).asSerializableString())
		.column("countNumber",DataTypes.BIGINT() )
		.column("ts", DataTypes.TIMESTAMP(3))
	 //	.columnByMetadata("ts", DataTypes.TIMESTAMP(3), "timestamp")
	//    .watermark("ts", "ts - INTERVAL '5' SECOND")
		.primaryKey("valueKey");
		 
		//Caused by: org.apache.flink.table.api.ValidationException: Could not find required sink format 'value.format'.

		
		TableDescriptor versionTableScriptor = TableDescriptor.forConnector("upsert-kafka")
				   .schema(versionSchemaBuilder.build() )
				   .option("topic", "versioned_topic")
				   .option("properties.bootstrap.servers", "192.168.157.140:9092")
				   .option("properties.group.id", "testGroup")
				 //  .option("scan.startup.mode", "earliest-offset")
		 		    .option("key.format", "json")
		 		  // .option("key.fields-prefix", "csv")
				   .option("value.format", "json")
				   
				  // .option("value.fields-include", "EXCEPT_KEY") //要设置此option才能读取 kafka key
				   .build();
		
		
		 try{
			 //Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not execute CreateTable in path `myhive`.`default`.`FileTable`
			 tableEnv.createTemporaryTable("KafkaTable", tableScriptor);
			 tableEnv.createTemporaryTable("KafkaVersionTable", versionTableScriptor);
			 // insert 的字段可以少于select 字段,但select 字段必须等于table定义的字段
			 // Kafka 的key 会保存primaryKey信息,insert 后单条message 信息如下
			 // -- KEY {"valueKey":19}
			 // -- VALUE  {"valueKey":19,"countNumber":5,"ts":"2022-01-13 14:05:19.07"}
			 tableEnv.executeSql("INSERT INTO KafkaVersionTable(valueKey,countNumber,ts)"
			 		+ " SELECT  valueKey,count(*) ,CURRENT_TIMESTAMP	 AS countNumber "
			 		+ "	 from KafkaTable group by valueKey");
			// tableEnv.executeSql(" insert into KafkaVersionTable VALUES (1,100)");
			 //sql 错误会不输出结果 where op='+U'
			 Table table = tableEnv.sqlQuery("select  *   from KafkaVersionTable   ");
 			 //Table table = tableEnv.sqlQuery("select  *   from KafkaVersionTable where Row.ofKind='UPDATE_AFTER'  ");
				// .fetch(2);
				// 获取数据 并只输出更新后数据
 				TableResult result = table.execute();
 				 table.execute().collect().forEachRemaining(row ->{
 						if (row.getKind().equals(RowKind.UPDATE_AFTER)) {
 							System.out.println(row.toString());
 						};
 				});
				// row.getkind 为UPDATE_BEFORE 及UPDATE_AFTER
 			//	String s=result.getResolvedSchema().toString();
				// System.out.println(s);
				// result.print();
		 }catch(Exception e) {
			
			 e.printStackTrace();
		 }
		 
		 
	 
	}

}
举报

相关推荐

0 条评论