0
点赞
收藏
分享

微信扫一扫

Flink简单使用手册

1kesou 2022-02-23 阅读 81

Flink简单使用教程

一、基础

1.1 环境配置

在pom.xml引入flink的相关依赖

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>1.11.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>1.11.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-clients_2.11</artifactId>
	<version>1.11.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.11.0</version>
</dependency>

如果已经引入了Kafka的依赖,为了避免flink和Kafka使用的scala版本不同导致的错误,需要在Kafka的依赖中排除掉对scala的依赖:

<exclusion>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
</exclusion>

1.2 基础概念

Flink是一个流批一体的数据处理平台。

自然环境中,数据的产生是流式的。无论是来自 Web 服务器的事件数据、证券交易所的交易数据,还是来自车间机器上的传感数据,其数据都是流式的。

分析数据时,可以围绕 有界流(bounded无界流(unbounded) 两种模型来处理数据。

批处理针对的是有界数据流,可以在计算结果输出之前输入整个数据集。因此,可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理针对的是无界数据流,理论上来说输入永远不会结束。因此,程序必须持续不断地对到达的数据进行处理。


1.3 什么能被转换成流?

Flink 的DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有:

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

Tuple就是一个元组,例如Tuple2就是一个二元组,如下是一个有两个属性的二元组实例:

在这里插入图片描述


1.4 DataStream

简单的来说就是需要处理的数据源,DataStream是flink程序中必不可少的一个类,该类用于表示数据集合,可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

在用法上类似于常规的 Java集合,一旦它们被创建就不能添加或删除元素,也不能简单地察看内部元素,只能使用 DataStream API 操作来处理它们,这种通过DataStream API的操作也叫作转换(transformation)。


1.5 Flink程序基本构成

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(StreamExecutionEnvironment);
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行(Execute)。

StreamExecutionEnvironment 是所有 Flink 程序的基础。可以使用 StreamExecutionEnvironment 的静态方法获取 StreamExecutionEnvironment:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常,只需要使用 getExecutionEnvironment() ,该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。

如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,返回一个执行环境以在集群上执行你的程序。


1.6 Data Source

Source 是程序读取其输入的地方,通过 StreamExecutionEnvironment`可以访问多种预定义的 stream source:

基于文件:

  • readTextFile(path) - 读取文本文件,逐行读取并将它们作为字符串返回。
  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。

基于套接字:

  • socketTextStream - 从套接字读取。

基于集合:

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
  • fromElements(T ...) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
  • generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

自定义:

  • addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。

1.7 Data Sick

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。

Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() - 将元素按行写成字符串。
  • writeAsCsv(...) - 将元组写成逗号分隔值文件。
  • print() - 在标准输出流上打印每个元素的 toString() 值。
  • writeUsingOutputFormat() - 自定义文件输出的方法和基类。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

二、流处理实例

2.1 通过年龄过滤出成年人和未成年人

public class Example {

    public static void main(String[] args) throws Exception {
        //1.创建flink的执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		//2.创建数据源
        DataStream<Person> flintstones = env.fromElements(
            new Person("Fred", 35),
            new Person("Wilma", 35),
            new Person("Pebbles", 2));
		//3.对数据进行过滤
        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });
		//4.数据的处理方式(输出控制台)
        adults.print();
		//5.执行flink任务
        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {}

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

2.2 统计单词个数

基于流窗口的单词统计应用程序,计算 5 秒窗口内来自 Web 套接字的单词数

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
		//1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		//2.通过socket创建数据源,并进行过滤
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())//调用Splitter类按规则过滤输入字符串,返回的是一个个二元组(单词,单词出现次数)
                .keyBy(value -> value.f0) //按照单词进行排序
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//限定窗口时间
                .sum(1);//1表示按照第2个位置的数字进行求和,即计算单词出现的次数
		//3.输出处理结果
        //需要注意的是,输出结果是源源不断的过程,首先env.execute执行,然后只要flink接收到数据,就会调用注释2.xx和3.xx的代			码,过滤数据,输出结果,中间这一部分代码会一直执行
        dataStream.print();
		//4.执行程序
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            //通过空格对输入进行分组,每一个单词的属性值(出现次数)均为1
            //例如输入:a b c a b,输出为:a 2,b 2,c 1(a出现2次,b出现2次,c出现1次)
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

要运行示例程序,首先从终端使用 netcat (要下载相关软件)启动输入流:

nc -lk 9999

只需输入一些单词,然后按回车键即可传入新单词。


三、批处理实例

public class WordCount {
    
    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.通过文本文件(有界数据)创建数据源
        DataSource<String> dataSource = env.readTextFile("C:\\...")
        //3.处理输入
        DataSet<Tuple2<String,Integer>> dataSet =  dataSource
                .flatMap(new Splitter())//调用Splitter类按规则过滤输入字符串,返回的是一个个二元组(单词,单词出现次数)
                .groupBy(0) //0表示按照第1个位置的单词进行排序
                .sum(1);//1表示按照第2个位置的数字进行求和,即计算单词出现的次数
        //4.将处理结果写出到文件
        dataSet.writeAsCsv("C:\\...");
        //5.执行程序
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            //通过空格对输入进行分组,每一个单词的属性值(出现次数)均为1
            //例如输入:a b c a b,输出为:a 2,b 2,c 1(a出现2次,b出现2次,c出现1次)
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

四、flink读写Kafka数据

测试时,首先运行消费者程序,再运行生产者

4.1 创建消费者类FlinkConsumer

public class WindowWordCount {

	public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers","Kafka集群地址");
        //3.构造消费者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
        //4.配置消费者
        DataStreamSource stream = env.addSource(consumer);
        //5.data sick
        stream.print();
        //6.执行程序
        env.execute("消费者程序");
    }
}

4.2 创建生产者类FlinkProducer

public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers","Kafka集群地址");
        //3.构造生产者
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), props);
        //4.配置数据源和生产者
        env.fromElement("hello", "flink", "kafka").addSink(producer);
        //5.执行程序
        env.execute("生产者程序");
    }
}

4.3 概念

Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。

构造函数接受以下参数:

  1. Topic 名称或者名称列表
  2. 用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema
  3. Kafka 消费者的属性。需要以下属性:
  • “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
  • “group.id” 消费组 ID

Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

构造器接收下列参数:

  1. 事件被写入的默认输出 topic
  2. 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
  3. Kafka client 的 Properties。下列 property 是必须的:
    • “bootstrap.servers” (逗号分隔 Kafka broker 列表)
  4. 容错语义
举报

相关推荐

IDEA使用手册

Typora使用手册

Consul使用手册

npm使用手册

Docker使用手册

0 条评论