standalone模式
1.启动集群
bin/start-cluster.sh
2.web UI 界面介绍
(1)端口号:8081
(2)JobManager配置查看
(3)TaskManager信息
点击某个taskManager进入下面的页面:
(4)Web端提交job页面
add 拖进来打包好的jar包
3.standalone模式下提交任务
3.1 Web UI上提交
注意:下面代码设置并行度为1
public class Flink03_WordCount_Unbounded {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//设置并行度为1
//参数工具类
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
//TODO 读取端口数据 无界流
DataStream<String> inputDataStream = env.socketTextStream(host, port);
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = inputDataStream
.flatMap(new Flink01_WordCount_Batch.MyFlatMapFunc())
.keyBy(0);
DataStream<Tuple2<String, Integer>> wordCountDataStream = keyedStream.sum(1);
wordCountDataStream.print("result");
env.execute("asd");
}
}
(1)提交任务首先需要将在IDEA中写好的flink程序进行打包
(2)如果我们在主程序中用了ParameterTool
工具类获取命令行参数,那么下面的命令行参数必须配置,否则无法提交
(3)show plan可以查看执行计划
- 当前job有两个task;
- 当前job只使用了1个slot;
将代码中的env.setParallelism(1);//设置并行度为1 删除,并在提交页面设置并行度为2再次提交
查看执行计划:
知识点1 flink source 并行度
Source是否可以并行取决于继承哪个SourceFunction,如果是ParallelSourceFunction
,就可以并行读取。
- Socket Source:是单并行度的,不可并行
- Kafka Source 可以并行读取,一个分区可以对应一个并行度来读取
知识点2 传输过程rebalance和hash
-
flink默认传输过程是rebalance,表示均衡,使用轮寻规则;
当上一个算子并行度和下一个算子并行度不同,就会使用负载均衡; -
hash 就是keyby算子的传输过程,根据key的hash值 % 并行度决定keyedStream中数据的流向哪一个下游并行度。
知识点3 job的task个数
- 从执行计划图来看:每个蓝色框的并行度的和就是当前job的task个数。
知识点4 slot个数
- 一个job占用的slot个数取决于所有蓝色框中最大的并行度
知识点5 可以看到数据从哪个Task溜走
由于并行度为1,只用到一个slot,因此只有一个taskManager中处理数据了:
3.2 Standalone模式,资源不足的问题
资源不足导致job阻塞的现象
-
当在配置文件中配置了每个taskManager的slots个数为2的时候,三台机器就是6个slots,此时提交1个并行度为4的job,就会占用4个slots,还剩两个slots。
-
如果再提交一个job的并行度>2,此时flink集群的可用slot个数变为0,由于slot个数不够用了,因此第二个job会阻塞等待。
现象查看
- 提交了第二个后,可用slots变为0,running job变成2,但是第二个job一直处于调度状态。
- 如果将第一个任务停了之后,资源释放了,那么第二个Job就能运行了
3.3 命令行提交Job
flink run
bin/flink run -c com.atguigu.wc.Flink03_WordCount_Unbounded -p 2 FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host hadoop102 --port 7777
bin/flink run -c 全类名
-p 并行度
jar包 [main方法的参数,写死了就不用给,通过参数工具需要给]
flink list
列出已经运行的flink job
flink cancel
flink cancel jobID 即可取消job