sensor.txt内容
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1
Flink定义表结构
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
object TableApiTest2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
// 从外部系统读取数据,在环境中注册表
// 连接到文件系统(Csv)
val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv()) // 定义读取数据之后的格式化方法
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("inputTable") // 注册一张表
val table: Table = tableEnv.from("inputTable")
table.toAppendStream[(String, Long, Double)].print()
env.execute("table api test job")
}
}
输出结果
(sensor_1,1547718199,35.8)
(sensor_6,1547718201,15.4)
(sensor_7,1547718202,6.7)
(sensor_10,1547718205,38.1)
(sensor_1,1547718207,37.2)
(sensor_1,1547718212,33.5)
(sensor_1,1547718215,38.1)