0
点赞
收藏
分享

微信扫一扫

flink 1.10.1 java版本wordcount演示 (nc + socket)

一脸伟人痣 2022-01-26 阅读 84

1. 在linux机器上安装nc

yum install nc y

按提示完成安装。测试的时候比较方便。

2. 在idea创建maven项目,并添加依赖

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.10.1</flink.version>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.7</slf4j.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- Flink 的 Java api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <!-- Flink Streaming 的 Java api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <!-- Flink 的 Web UI -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

3. 编写java代码

package com.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * author:HUAWEI
 */
public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        // 此处数据源是的 Linux 主机,通过 socket 的方式传输数据
        DataStreamSource<String> socket = env.socketTextStream("192.168.0.181", 9000, "\n");
         lambda(socket);
//        function(socket);
//        lambdaAndFunction(socket);
//        richFunction(socket);
        env.execute();
    }

    // 富函数方式
    private static void richFunction(DataStreamSource<String> socket) {
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void close() throws Exception {
                super.close();
            }

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strings = value.split(" ");
                for (String s : strings) {
                    out.collect(Tuple2.of(s, 1));
                }
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = flatMap.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        tuple2StringKeyedStream.sum(1).print();
    }

    // Lambda 和函数混合完成
    private static void lambdaAndFunction(DataStreamSource<String> socket) {
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strings = value.split(" ");
                for (String s : strings) {
                    out.collect(Tuple2.of(s, 1));
                }

            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(f -> f.f0).sum(1);
        sum.print();
    }

    // 纯函数完成
    private static void function(DataStreamSource<String> socket) {
        SingleOutputStreamOperator<String> flatMap = socket.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] strings = value.split(" ");
                for (String s : strings) {
                    out.collect(s);
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy("f0").sum(1);
        sum.print();
    }

    // Lambda 方式完成
    private static void lambda(DataStreamSource<String> socket) {
        socket.flatMap((String value, Collector<String> out) -> {
            Arrays.stream(value.split(" ")).forEach(out::collect);
        }).returns(Types.STRING)
                .map(f -> Tuple2.of(f, 1)).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(0)
                .sum(1)
                .print();
    }

}

注意这里的socketTextStream设置,其中ip为运行nc的服务器ip,端口为nc的监听端口。

4. 在对应服务器上启动nc

nc -l 9000

5. 运行flink程序

6. 执行测试

在nc服务器上输入字符串并回车。

观察程序控制台输出。

 

 浏览器查看任务运行

 

举报

相关推荐

0 条评论