import org.apache.flink.streaming.api.scala._
object SourceTest2 {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val value: DataStream[String] = env.readTextFile("D:\\Flink\\src\\main\\resources\\sensor.txt")
value.print("stream1")
env.execute("source test job")
}
}
其中D:\Flink\src\main\resources\sensor.txt路径下的文件内容是:
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1
执行输出结果:
stream1> sensor_1,1547718199,35.8
stream1> sensor_6,1547718201,15.4
stream1> sensor_7,1547718202,6.7
stream1> sensor_10,1547718205,38.1
stream1> sensor_1,1547718207,37.2
stream1> sensor_1,1547718212,33.5
stream1> sensor_1,1547718215,38.1