0
点赞
收藏
分享

微信扫一扫

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】

王老师说 2022-10-28 阅读 123


一、环境准备

1.下载地址:​​https://flink.apache.org​​

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_flink


下面那个hadoop的整合包要放到flink的lib中去

2.上传到linux中去,并解压到相关目录

tar -zxvf flink-1.9.1-… apps/

二、standalone部署

2.1 修改conf中的flink-conf.yaml

# 1.主节点的主机名
jobmanager.rpc.address: hadoop01
# 2.节点的资源槽数
taskmanager.numberOfTaskSlots: 2
# 3.单机的话,暂时不用配置zookeeper的地址

2.2 修改conf中的slaves

# 设置从节点
hadoop02
hadoop03

2.3 拷贝到其他节点

scp -r flink-1.9.1/ hadoop02:$PWD
scp -r flink-1.9.1/ hadoop03:$PWD

2.4 启动集群

bin/start-cluster.sh

2.5 测试访问

hadoop01:8081

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_flink_02

2.6 页面提交程序jar包

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_scala_03


提前开设端口,在上面的提交之前。

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_scala_04

2.7 命令行窗口提交程序jar包

# --hostname hadoop01  --port 8888 为参数
bin/flink run -m hadoop01 -p 4 -c com.wang.Main.class /jar路径 --hostname hadoop01 --port 8888

三、项目整合【Maven3.x+Jdk8/Scala-2.11】

3.1 pom依赖

<dependencies>
<!--如果是java程序 java所需要的jar-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>

<!--如果是scala程序 scala所需要的jar-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
<scope>provided</scope>
</dependency>

<!--依赖日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>com.wang.flink.SocketWordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

3.2 实时计算【DataStream】

1.利用socket通信实现实时的单词计数计算。
2.开启socket端口号

nc -lk 8888

3.java程序

public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.创建数据集Source
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
// 3.数据转换Transformations
DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
// 输出到收集器
out.collect(word);
}
}
});

// 4.单词和1组合
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});

// 5. 分组聚合 单词:次数
SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = wordAndOne.keyBy(0).sum(1);

// 6.Sink 数据下沉
// 这里只打印到控制台
sumed.print();

// 7.启动
env.execute("StreamingWordCount");

}
}

3.测试结果

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_scala_05


4.打包到集群运行

将程序中的socket通信的主机地址和端口号改为参数形式如:

DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

1)web页面提交,提前开启socket端口

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_hadoop_06


测试结果如下:

Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_flink_07


Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】_hadoop_08


5.java8 lambda表达式的优化

<Tuple2<String, Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
Arrays.stream(line.split(" ")).forEach(w -> {
out.collect(Tuple2.of(w, 1));
});
});

3.3 离线计算【DataSet】

1.整理一个文件,放入一些数据如
flink flink spark hadoop
flink vue java
hdfs spark
2.java程序

public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1.获取配置
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2.读取数据
DataSource<String> lines = env.readTextFile(args[0]);
// 3.切分压平
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});

// 4.离线计算 分组聚合groupBy(),而不是实时计算的keyBy()
AggregateOperator<Tuple2<String, Integer>> sumed = wordAndOne.groupBy(0).sum(1);

// 5.保存数据 设置并行度 数据几个文件
sumed.writeAsText(args[1]).setParallelism(2);
// 6.执行
env.execute("BatchWordCount");
}
}


举报

相关推荐

0 条评论