<configuration> <!-- 设置yarn中的服务类 --> <property> <description>A comma separated list of services where service name should only contain a-zA-Z0-9_ and can not start with numbers</description> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 配置resourcemanager 的主机位置 --> <property> <description>The hostname of the RM.</description> <name>yarn.resourcemanager.hostname</name> <value>flink02</value> </property>
<!-- AM重启最大尝试次数 --> <property> <description>The maximum number of application attempts. It's a global setting for all application masters. Each application master can specify its individual maximum number of application attempts via the API, but the individual number cannot be more than the global upper bound. If it is, the resourcemanager will override it. The default number is set to 2, to allow at least one retry for AM.</description> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> </property> <!-- 开启物理内存限制 --> <property> <description>Whether physical memory limits will be enforced for containers.</description> <name>yarn.nodemanager.pmem-check-enabled</name> <value>true</value> </property> <!-- 关闭虚拟内存限制 --> <property> <description>Whether virtual memory limits will be enforced for containers.</description> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
<!-- 虚拟内存和物理内存比例 --> <property> <description>Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is allowed to exceed this allocation by this ratio. </description> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>5</value> </property> <!-- 每个Container请求的最小内存 --> <property> <description>The minimum allocation for every container request at the RM, in MBs. Memory requests lower than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.minimum-allocation-mb</name> <value>1024</value> </property> <!-- 每个Container请求的最大内存 --> <property> <description>The maximum allocation for every container request at the RM, in MBs. Memory requests higher than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.maximum-allocation-mb</name> <value>7168</value> </property> <!-- 每个Container请求的最小virtual CPU cores --> <property> <description>The minimum allocation for every container request at the RM, in terms of virtual CPU cores. Requests lower than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.minimum-allocation-vcores</name> <value>1</value> </property> <!-- 每个Container请求的最大virtual CPU cores --> <property> <description>The maximum allocation for every container request at the RM, in terms of virtual CPU cores. Requests higher than this will throw a InvalidResourceRequestException.</description> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>16</value> </property> <!-- 限制 NodeManager 能够使用的最大物理内存 --> <property> <description>Flag to determine if logical processors(such as hyperthreads) should be counted as cores. Only applicable on Linux when yarn.nodemanager.resource.cpu-vcores is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true. </description> <name>yarn.nodemanager.resource.memory-mb</name> <value>14336</value> </property> <!-- 限制 NodeManager 能够使用的最大virtual CPU cores --> <property> <description>Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of CPUs used by YARN containers. If it is set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically determined from the hardware in case of Windows and Linux. In other cases, number of vcores is 8 by default.</description> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>16</value> </property> <!-- 启用日志聚集功能 --> <property> <description>Whether to enable log aggregation. Log aggregation collects each container's logs and moves these logs onto a file-system, for e.g. HDFS, after the application completes. Users can configure the "yarn.nodemanager.remote-app-log-dir" and "yarn.nodemanager.remote-app-log-dir-suffix" properties to determine where these logs are moved to. Users can access the logs via the Application Timeline Server.</description> <name>yarn.log-aggregation-enable</name> <value>true</value> </property>
<!-- 设置HDFS上日志的保存时间,默认设置为7天--> <property> <description>Time in seconds to retain user logs. Only applicable if log aggregation is disabled</description> <name>yarn.nodemanager.log.retain-seconds</name> <value>10800</value> </property>
# 默认配置启动flink on yarn(默认启动资源如下) # {masterMemoryMB=1024, taskManagerMemoryMB=1024,numberTaskManagers=1, slotsPerTaskManager=1} yarn-session.sh
############## 系统默认使用con/flink-conf.yaml里的配置,Flink on yarn将会覆盖掉几个参数: # jobmanager.rpc.address因为jobmanager的在集群的运行位置并不是事先确定的,其实就是AM的地址; # taskmanager.tmp.dirs使用yarn给定的临时目录; # parallelism.default也会被覆盖掉,如果在命令行里指定了slot数。
############## 自定义配置可选参数如下 # Required -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) # Optional -D <arg> Dynamic properties -d,--detached Start detached -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -nm,--name Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode # 示例:启动15个TaskManager,1个JobManager,JobManager内存1024M,每个TaskManager内存1024M且含有8个slot,自定义该应用的名称为FlinkOnYarnSession,-d以分离式模式执行(不指定-d则以客户端模式执行) yarn-session.sh -n 15 -jm 1024 -tm 1024 -s 8 -nm FlinkOnYarnSession -d
# 客户端模式指的是在终端启动一个客户端,这种方式是不能断开终端的,断开即相当于kill掉Flink集群 # 分离式模式指的是启动Flink on Yarn后,Flink YARN客户端将仅向Yarn提交Flink,然后自行关闭。,要kill掉Flink集群需要使用如下命令: yarn application -kill <appId> # <appId>指的是发布在Yarn上的作业ID,在Yarn集群上可以查到对应的ID # 对于Flink On Yarn来说,一个JobManager占用一个Container,一个TaskManager占用一个Container # JobManager的数量+TaskManager的数量 = 申请的Container的数量 # 以下以6台16核,16G内存的机器举例说明(每台节点预留2G内存给系统) yarn.nodemanager.resource.cpu-vcores=16 每台NodeManager节点为YARN集群分配的cpu为16核 yarn.nodemanager.resource.memory-mb=14336 每台NodeManager节点为YARN集群分配的物理内存为14G yarn.scheduler.minimum-allocation-vcores=1 每台NodeManager节点上每个Contaniner最小使用1核cpu yarn.scheduler.minimum-allocation-mb=1024 每台NodeManager节点上每个Contaniner最小使用1G的物理内存 # 若所有节点全部用于Flink作业,推荐提供的Flink集群: (总的资源为14*6=84G内存,16*6=96核) yarn-session.sh -n 8 -jm 4096 -tm 3584 -s 16 -nm FlinkOnYarnSession -d 一共占用32G内存,9cores,申请了1个4G/1cores的JobManager和8个3.5G/1cores/16slots的TaskManager ############### Recovery behavior of Flink on YARN # Flink’s YARN client has the following configuration parameters to control how to behave incase of container failures. These parameters can be set either from the conf/flink-conf.yaml or when starting the YARN session, using -D parameters
In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the YARN log aggregation. To enable it, users have to set the yarn.log-aggregation-enableproperty to true in the yarn-site.xml file. Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
1
yarn logs -applicationId <application ID>
Note that it takes a few seconds after the session has finished until the logs show up.