flink的安装包我们前面已经下载好了,上传到集群,解压,我这里命名成flink-on-yarn
现在flink的软连接指向
我们改成指向flink-on-yarn
修改hadoop的配置文件的yarn-siite.xml
<property>
<name>yarn.resourcemanager.am-attempts</name>
<value>4</value>
</property>
把该配置文件分发给其他节点
scp -r yarn-site.xml hadoop@slave1:/opt/modules/hadoop-2.8.5/etc/hadoop/
scp -r yarn-site.xml hadoop@slave2:/opt/modules/hadoop-2.8.5/etc/hadoop/
以上参数都是默认的,我们可以不用配置
#yarn
yarn.maximum-failed-containers: 99999
high-availability: zookeeper
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
high-availability.zookeeper.path.root: /flink-on-yarn
high-availability.storageDir: hdfs://ns/flink/recovery
yarn.application-attempt: 10
#akka config
akka.watch.heartbeat.interval: 5s
akka.watch.heartbeat.pause: 20s
akka.ask.timeout: 60s
akka.framesize: 20971520b
state.backend: rocksdb
state.checkpoints.dir: hdfs://ns/flink/checkpoint
state.savepoints.dir: hdfs://ns/flink/savepoint
#java
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
yarn.heap-cutoff-ratio: 0.2
taskmanager.memory.off-heap: true
上传lib包
我这里已经启动好了
报错了
2021-04-06 10:55:37,827 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1617674927376_0001
2021-04-06 10:55:38,314 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1617674927376_0001
2021-04-06 10:55:38,314 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2021-04-06 10:55:38,317 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2021-04-06 10:55:59,747 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while running the Flink Yarn session.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:387)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$3(FlinkYarnSessionCli.java:838)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:838)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1617674927376_0001 failed 2 times in previous 60000 milliseconds due to AM Container for appattempt_1617674927376_0001_000002 exited with exitCode: -103
Failing this attempt.Diagnostics: Container [pid=44553,containerID=container_1617674927376_0001_02_000001] is running beyond virtual memory limits. Current usage: 495.2 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.
Dump of the process-tree for container_1617674927376_0001_02_000001 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 44565 44553 44553 44553 (java) 12 48 2202017792 126574 /opt/modules/jdk1.8.0_221/bin/java -Xms424m -Xmx424m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError -Dlog.file=/opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint
|- 44553 44551 44553 44553 (bash) 0 0 115851264 185 /bin/bash -c /opt/modules/jdk1.8.0_221/bin/java -Xms424m -Xmx424m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError -Dlog.file=/opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint 1> /opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.out 2> /opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.err
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
For more detailed output, check the application tracking page: http://master:8088/cluster/app/application_1617674927376_0001 Then click on links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1617674927376_0001
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1027)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:509)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:380)
... 7 more
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:387)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$3(FlinkYarnSessionCli.java:838)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:838)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1617674927376_0001 failed 2 times in previous 60000 milliseconds due to AM Container for appattempt_1617674927376_0001_000002 exited with exitCode: -103
Failing this attempt.Diagnostics: Container [pid=44553,containerID=container_1617674927376_0001_02_000001] is running beyond virtual memory limits. Current usage: 495.2 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.
Dump of the process-tree for container_1617674927376_0001_02_000001 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 44565 44553 44553 44553 (java) 12 48 2202017792 126574 /opt/modules/jdk1.8.0_221/bin/java -Xms424m -Xmx424m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError -Dlog.file=/opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint
|- 44553 44551 44553 44553 (bash) 0 0 115851264 185 /bin/bash -c /opt/modules/jdk1.8.0_221/bin/java -Xms424m -Xmx424m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError -Dlog.file=/opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint 1> /opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.out 2> /opt/modules/hadoop-2.8.5/logs/userlogs/application_1617674927376_0001/container_1617674927376_0001_02_000001/jobmanager.err
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
For more detailed output, check the application tracking page: http://master:8088/cluster/app/application_1617674927376_0001 Then click on links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1617674927376_0001
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1027)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:509)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:380)
... 7 more
2021-04-06 10:55:59,757 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cancelling deployment from Deployment Failure Hook
2021-04-06 10:55:59,758 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Killing YARN application
2021-04-06 10:55:59,763 INFO org.apache.hadoop.io.retry.RetryInvocationHandler - Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over rm1. Trying to failover immediately.
java.io.IOException: The client is stopped
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1519)
at org.apache.hadoop.ipc.Client.call(Client.java:1381)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy7.forceKillApplication(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.forceKillApplication(ApplicationClientProtocolPBClientImpl.java:213)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy8.forceKillApplication(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:439)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:419)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.failSessionDuringDeployment(AbstractYarnClusterDescriptor.java:1204)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.access$200(AbstractYarnClusterDescriptor.java:111)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor$DeploymentFailureHook.run(AbstractYarnClusterDescriptor.java:1500)
2021-04-06 10:55:59,764 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm2
2021-04-06 10:55:59,773 WARN org.apache.hadoop.ipc.Client - Failed to connect to server: slave1/192.168.215.162:8032: retries get failed due to exceeded maximum allowed retries number: 0
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
at org.apache.hadoop.ipc.Client.call(Client.java:1381)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy7.forceKillApplication(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.forceKillApplication(ApplicationClientProtocolPBClientImpl.java:213)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy8.forceKillApplication(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:439)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:419)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.failSessionDuringDeployment(AbstractYarnClusterDescriptor.java:1204)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.access$200(AbstractYarnClusterDescriptor.java:111)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor$DeploymentFailureHook.run(AbstractYarnClusterDescriptor.java:1500)
2021-04-06 10:55:59,774 INFO org.apache.hadoop.io.retry.RetryInvocationHandler - Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over rm2 after 1 failover attempts. Trying to failover after sleeping for 32743ms.
java.net.ConnectException: Call From master/192.168.215.161 to slave1:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy7.forceKillApplication(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.forceKillApplication(ApplicationClientProtocolPBClientImpl.java:213)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy8.forceKillApplication(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:439)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.killApplication(YarnClientImpl.java:419)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.failSessionDuringDeployment(AbstractYarnClusterDescriptor.java:1204)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.access$200(AbstractYarnClusterDescriptor.java:111)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor$DeploymentFailureHook.run(AbstractYarnClusterDescriptor.java:1500)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
at org.apache.hadoop.ipc.Client.call(Client.java:1381)
... 20 more
2021-04-06 10:56:32,517 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1
出现此类错误,主要的原因是Current usage: 75.1 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memor
y used. Killing container.
字面原因是容器内存不够,实际上是flink on yarn启动时检查虚拟内存造成的
所以修改配置文件,让它不检查就没事了
修改etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
重启一下hadoop查看running状态下的任务
scala> val dataSet=benv.fromElements("flink flink flink","spark spark spark")
dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@672ba9cc
scala> val counts=dataSet.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map((_,1)).groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@580581b4
scala> counts.print()
(flink,3)
(spark,3)