目录导航
Flink 的API层级介绍Source Operator速览
-  Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象 -  第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理 
-  第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发 - 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
 
-  第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差 - 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
- 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
 
-  第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式 - SQL 抽象与 Table API 抽象之间的关联是非常紧密的
 
-  注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层 
  
 
-  
-  Flink编程模型 

-  Source来源 -  元素集合 - env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
 
-  文件/文件系统 - env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
 
-  基于Socket - env.socketTextStream(“ip”, 8888)
 
-  自定义Source,实现接口自定义数据源,rich相关的api更丰富 -  并行度为1 - SourceFunction
- RichSourceFunction
 
-  并行度大于1 - ParallelSourceFunction
- RichParallelSourceFunction
 
 
-  
 
-  
-  Connectors与第三方系统进行对接(用于source或者sink都可以) - Flink本身提供Connector例如kafka、RabbitMQ、ES等
- 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
 
-  Apache Bahir连接器 - 里面也有kafka、RabbitMQ、ES的连接器更多
 
-  总结 和外部系统进行读取写入的 - 第一种 Flink 里面预定义的 source 和 sink。
- 第二种 Flink 内部也提供部分 Boundled connectors。
- 第三种是第三方 Apache Bahir 项目中的连接器。
- 第四种是通过异步 IO 方式 
    - 异步I/O是Flink提供的非常底层的与外部系统交互
 
 
Flink 预定义的Source 数据源 案例实战
- Source来源 
  - 元素集合 
    - env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
 
 
- 元素集合 
    
 public static void main(String [] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //相同类型元素的数据流 source
        DataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
        stringDS1.print("stringDS1");
        DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));
        stringDS2.print("stringDS2");
        DataStreamSource<Long> longDS3 = env.fromSequence(0,10);
        longDS3.print("longDS3");
        //DataStream需要调用execute,可以取个名称
        env.execute("xdclass job");
    }
- 文件/文件系统 
  - env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
 
public static void main(String [] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
        //DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
        textDS.print();
        env.execute("xdclass job");
}
- 基于Socket 
  - env.socketTextStream(“ip”, 8888)
 
   public static void main(String [] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
        stringDataStream.print();
        env.execute(" job");
}
Flink自定义的Source 数据源案例-订单来源实战
-  自定义Source,实现接口自定义数据源 -  并行度为1 - SourceFunction
- RichSourceFunction
 
-  并行度大于1 - ParallelSourceFunction
- RichParallelSourceFunction
 
-  Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等 
 
-  
-  创建接口 
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;
}
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
	private volatile Boolean flag = true;
    private  Random random = new Random();
    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {
        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            ctx.collect(new VideoOrder(id,title,money,userId,new Date()));
        }
    }
    /**
     * 取消任务
     */
    @Override
    public void cancel() {
        flag = false;
    }
}
- 案例
public static void main(String [] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());
        videoOrderDataStream.print();
        //DataStream需要调用execute,可以取个名称
        env.execute("custom source job");
    }
不断产生很多订单











