0
点赞
收藏
分享

微信扫一扫

FlinkTableAPI和FlinkSql将计算出来的结果写到文件里面去

dsysama 2022-07-12 阅读 80

sensor.txt

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1

TableAPI

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}


// 输出表计算的结果到文件
object OutputTableTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 读取数据创建DataStream
val inputStream: DataStream[String] = env.
readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})

// 创建表环境
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 将DataStream转换成Table
// 你可以给表字段换个顺序,或者as个别名
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)
// 对Table进行转换操作,得到结果表
val resultTable: Table = sensorTable
.select('id, 'temp)
.filter('id === "sensor_1")

// val aggResultTable: Table = sensorTable
// .groupBy('id)
// .select('id, 'id.count as 'count)

// 定义一张输出表,这就是要写入数据的TableSink
tableEnv.connect(new FileSystem().
path("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\out2.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE()) // 注意定义的字段和输出的数据要一一对应
)
.createTemporaryTable("outputTable")

// 将结果表写入table sink
resultTable.insertInto("outputTable")

env.execute("output table test")
}
}

输出文件结果

out2.txt

sensor_1,35.8
sensor_1,37.2
sensor_1,33.5
sensor_1,38.1

FlinkSql 方式

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}


// 输出表计算的结果到文件
object OutputTableTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 读取数据创建DataStream
val inputStream: DataStream[String] = env.
readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})

// 创建表环境
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 将DataStream转换成Table
// 你可以给表字段换个顺序,或者as个别名
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)
// 对Table进行转换操作,得到结果表

val resultTable: Table = tableEnv.
sqlQuery("select id, temp from "+sensorTable+" where id = 'sensor_1' ")

// 定义一张输出表,这就是要写入数据的TableSink
tableEnv.connect(new FileSystem().
path("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\out3.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE()) // 注意定义的字段和输出的数据要一一对应
)
.createTemporaryTable("outputTable")

// 将结果表写入table sink
resultTable.insertInto("outputTable")

env.execute("output table test")
}
}

输出文件结果

out3.txt

sensor_1,35.8
sensor_1,37.2
sensor_1,33.5
sensor_1,38.1


举报

相关推荐

0 条评论