sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1代码
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
/**
 * 从文件里面读取数据,处理完了在输出到Kafka中.
 */
object KafkaTableTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv()) // 定义读取数据之后的格式化方法
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      ) // 定义表结构
      .createTemporaryTable("inputTable") // 注册一张表
    // 做转换操作
    // 对Table进行转换操作,得到结果表
    val sensorTable: Table = tableEnv.from("inputTable")
    val resultTable: Table = sensorTable
      .select('id, 'temperature)
      .filter('id === "sensor_1")
    // 定义一个连接到kafka的输出表
    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sinkTest")
      .property("bootstrap.servers", "zjj101:9092")
      .property("zookeeper.connect", "zjj101:2181,zjj102:2181,zjj103:2181")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaOutputTable")
    // 将结果表输出
    resultTable.insertInto("kafkaOutputTable")
    val table = tableEnv.sqlQuery("select id,temp from kafkaOutputTable")
    table.toAppendStream[(String, Double)].print("test")
    env.execute("kafka table test")
  }
}启动Kafka消费者
[root@zjj101 ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server zjj101:9092 --topic  sinkTest启动Flink程序
IDEA控制台输出:
test> (sensor_1,35.8)
test> (sensor_1,37.2)
test> (sensor_1,33.5)
test> (sensor_1,38.1)查看消费者控制台输出框
[root@zjj101 ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server zjj101:9092 --topic  sinkTest
sensor_1,35.8
sensor_1,37.2
sensor_1,33.5
sensor_1,38.1可以发现输出出来东西了.
                
                









