0
点赞
收藏
分享

微信扫一扫

Flink中Table表和DataStream流的互相转换

伢赞 2022-07-12 阅读 81

sensor.txt

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1

转换代码

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}


// 表和流互相转换
object OutputTableTest2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 读取数据创建DataStream
val inputStream: DataStream[String] = env.
readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt")

// 流
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})

val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 将流转成了表
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)


// 将表转成了流
val value: DataStream[(String, Double, Long)] = sensorTable.toAppendStream[(String, Double, Long)]
// 输出表
value.print()

env.execute("output table test")
}
}

输出结果

可以发现输入的内容和输出的内容是一模一样的.

(sensor_1,35.8,1547718199)
(sensor_6,15.4,1547718201)
(sensor_7,6.7,1547718202)
(sensor_10,38.1,1547718205)
(sensor_1,37.2,1547718207)
(sensor_1,33.5,1547718212)
(sensor_1,38.1,1547718215)


举报

相关推荐

Flink中的批和流

0 条评论