0
点赞
收藏
分享

微信扫一扫

Flink中 TableAPI和FlinkSql的介绍和入门demo

互联网码农 2022-07-12 阅读 20

Maven依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>

sensor.txt 文件的内容

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1

TableAPI

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object TableExample2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度设置为1
env.setParallelism(1)

// 读取数据创建DataStream
val inputStream: DataStream[String] = env.
readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// 转换成样例类
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})

// 创建表执行环境 ,基于底层的执行环境创建高层的执行环境
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 基于数据流,转换成一张表,然后进行操作
val dataTable: Table = tableEnv.fromDataStream(dataStream)


// 调用Table API,得到转换结果
val resultTable: Table = dataTable
.select("id, temperature") // 查询这两个字段出来
.filter("id == 'sensor_1'") // 添加条件, id必须是sensor_1才被查询出来


// 转换回数据流,打印输出
val resultStream: DataStream[(String, Double)] = resultTable.toAppendStream[(String, Double)]
resultStream.print("result sql")


env.execute("table example job")
}
}

输出结果:

result sql> (sensor_1,35.8)
result sql> (sensor_1,37.2)
result sql> (sensor_1,33.5)
result sql> (sensor_1,38.1)

FlinkSql

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object TableExample2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度设置为1
env.setParallelism(1)

// 读取数据创建DataStream
val inputStream: DataStream[String] = env.
readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// 转换成样例类
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})

// 创建表执行环境 ,基于底层的执行环境创建高层的执行环境
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 基于数据流,转换成一张表,然后进行操作
val dataTable: Table = tableEnv.fromDataStream(dataStream)

//创建临时视图businessView
tableEnv.createTemporaryView("businessView", dataTable)

val resultSqlTable: Table = tableEnv
.sqlQuery("select id, temperature from businessView where id = 'sensor_1'")

// 转换回数据流,打印输出
val resultStream: DataStream[(String, Double)] = resultSqlTable.toAppendStream[(String, Double)]
resultStream.print("result sql")

env.execute("table example job")
}
}

输出结果:

result sql> (sensor_1,35.8)
result sql> (sensor_1,37.2)
result sql> (sensor_1,33.5)
result sql> (sensor_1,38.1)

两种区别

TableAPI和FlinkSql两种方式底层其实是一模一样的,只是调用方式不同

两种方式的特点:

1.TableAPI跟语言的嵌入更好,如果我哪一步出错了,我跟到源码里面去看的话会更方便一些

2.如果你用FlinkSql的话,sql写错了,报的错误你可以都看不太懂,你只能靠sql解析器报出来的原因去推测,你没有办法一步一步的跟源码追进去.这就是缺陷,但是好处就是写sql对于咱们程序员来讲更加熟悉,整个功能的业务处理都放到sql里面去完成了.而且sql是跨语言的.比如说flinksql和sparksql都是sql,另外对函数的一些支持,FlinkSql比TableAPI支持的更多.可能有一些需求只能用FlinkSql来实现.


举报

相关推荐

0 条评论