前期准备

1
2
3
4
# 关闭防火墙
# 配好主机映射
# 创建flink用户
# 配置免密登录
1
2
3
4
# 准备好相关资源:
# hadoop-2.8.5.tar.gz
# flink-1.7.1-bin-hadoop28-scala_2.11
# logback-classic.jar、logback-core.jar、log4j-over-slf4j.jar
1
# 节点配置如下:(建议每台NM节点预留2G内存给系统)
hostname 资源配置 节点名称
flink01 16G/16cores NameNode/DataNode/NodeManager
flink02 16G/16cores ResourceManager/DataNode/NodeManager
flink03 16G/16cores SecondaryNameNode/DataNode/NodeManager
flink04 16G/16cores DataNode/NodeManager
flink05 16G/16cores DataNode/NodeManager
flink06 16G/16cores DataNode/NodeManager

Hadoop配置

将Hadoop安装包解压至flink01节点的/data/apps路径下

1
tar -zxvf ~/hadoop-2.8.5.tar.gz -C /data/apps

进入配置目录

1
cd /data/apps/hadoop-2.8.5/etc/hadoop

修改hadoop-env.sh中的JAVA_HOME

1
export JAVA_HOME=/usr/java/jdk1.8.0_40

配置yarn-env.sh中的JAVA_HOME

1
export JAVA_HOME=/usr/java/jdk1.8.0_40

配置mapred-env.sh中的JAVA_HOME

1
export JAVA_HOME=/usr/java/jdk1.8.0_40

配置slaves

1
vim slaves   内容如下
1
2
3
4
5
6
flink01
flink02
flink03
flink04
flink05
flink06

配置core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<configuration>
<!-- 配置HDFS的路径的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://flink01:9000</value>
</property>

<!-- 修改hadoop临时保存目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/data/apps/hadoop-2.8.5/tmp</value>
</property>
</configuration>

配置hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<configuration>
<!-- 配置HDFS 的复制因子 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>

<!-- 关闭HDFS 权限检查,在hdfs-site.xml文件中增加如下配置信息 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<!-- 该属性定义了 HDFS WEB访问服务器的主机名和端口号 -->
<property>
<name>dfs.namenode.http-address</name>
<value>flink01:50070</value>
</property>

<!-- 定义secondarynamenode 外部地址 访问的主机和端口 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>flink03:50090</value>
</property>

<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name</value>
</property>

<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data</value>
</property>
</configuration>

配置mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<configuration>
<!-- 设置Mapreduce 框架运行名称yarn -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

<!-- 单个Map task 申请的内存大小 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>

<!-- 单个Reduce task 申请的内存大小 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>

<!-- Uber模式是Hadoop2中针对小文件作业的一种优化,如果作业量足够小,可以把一个task,在一个JVM中运行完成.-->
<property>
<name>mapreduce.job.ubertask.enable</name>
<value>true</value>
</property>

<!-- 配置历史服务器 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>flink01:19888</value>
</property>
</configuration>

配置yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
<configuration>
<!-- 设置yarn中的服务类 -->
<property>
<description>A comma separated list of services where service name should only
contain a-zA-Z0-9_ and can not start with numbers</description>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 配置resourcemanager 的主机位置 -->
<property>
<description>The hostname of the RM.</description>
<name>yarn.resourcemanager.hostname</name>
<value>flink02</value>
</property>

<!-- AM重启最大尝试次数 -->
<property>
<description>The maximum number of application attempts. It's a global
setting for all application masters. Each application master can specify
its individual maximum number of application attempts via the API, but the
individual number cannot be more than the global upper bound. If it is,
the resourcemanager will override it. The default number is set to 2, to
allow at least one retry for AM.</description>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>

<!-- 开启物理内存限制 -->
<property>
<description>Whether physical memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value>
</property>

<!-- 关闭虚拟内存限制 -->
<property>
<description>Whether virtual memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<!-- 虚拟内存和物理内存比例 -->
<property>
<description>Ratio between virtual memory to physical memory when
setting memory limits for containers. Container allocations are
expressed in terms of physical memory, and virtual memory usage
is allowed to exceed this allocation by this ratio.
</description>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>5</value>
</property>

<!-- 每个Container请求的最小内存 -->
<property>
<description>The minimum allocation for every container request at the RM,
in MBs. Memory requests lower than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>

<!-- 每个Container请求的最大内存 -->
<property>
<description>The maximum allocation for every container request at the RM,
in MBs. Memory requests higher than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>7168</value>
</property>

<!-- 每个Container请求的最小virtual CPU cores -->
<property>
<description>The minimum allocation for every container request at the RM,
in terms of virtual CPU cores. Requests lower than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.minimum-allocation-vcores</name>
<value>1</value>
</property>

<!-- 每个Container请求的最大virtual CPU cores -->
<property>
<description>The maximum allocation for every container request at the RM,
in terms of virtual CPU cores. Requests higher than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>16</value>
</property>

<!-- 限制 NodeManager 能够使用的最大物理内存 -->
<property>
<description>Flag to determine if logical processors(such as
hyperthreads) should be counted as cores. Only applicable on Linux
when yarn.nodemanager.resource.cpu-vcores is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true.
</description>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>14336</value>
</property>

<!-- 限制 NodeManager 能够使用的最大virtual CPU cores -->
<property>
<description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
CPUs used by YARN containers. If it is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically determined from the hardware in case of Windows and Linux.
In other cases, number of vcores is 8 by default.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>

<!-- 启用日志聚集功能 -->
<property>
<description>Whether to enable log aggregation. Log aggregation collects
each container's logs and moves these logs onto a file-system, for e.g.
HDFS, after the application completes. Users can configure the
"yarn.nodemanager.remote-app-log-dir" and
"yarn.nodemanager.remote-app-log-dir-suffix" properties to determine
where these logs are moved to. Users can access the logs via the
Application Timeline Server.</description>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 设置HDFS上日志的保存时间,默认设置为7天-->
<property>
<description>Time in seconds to retain user logs. Only applicable if
log aggregation is disabled</description>
<name>yarn.nodemanager.log.retain-seconds</name>
<value>10800</value>
</property>

</configuration>
参数 含义 备注
yarn.nodemanager.aux-services 设置yarn中的服务类 mapreduce_shuffle
yarn.resourcemanager.hostname 配置resourcemanager 的主机位置 flink02
yarn.resourcemanager.am.max-attempts AM重启最大尝试次数 4
yarn.nodemanager.pmem-check-enabled 开启物理内存限制 true 检测物理内存的使用是否超出分配值,若任务超出分配值,则将其杀掉,默认true。
yarn.nodemanager.vmem-check-enabled 关闭虚拟内存限制 false 检测虚拟内存的使用是否超出;若任务超出分配值,则将其杀掉,默认true。在确定内存不会泄漏的情况下可以设置此项为 False;
yarn.scheduler.minimum-allocation-mb 每个Container请求的最小内存 1024 单个容器/调度器可申请的最少物理内存量,默认是1024(MB);一般每个contain都分配这个值;即:capacity memory:3072, vCores:1,如果提示物理内存溢出,提高这个值即可;
yarn.scheduler.maximum-allocation-mb 每个Container请求的最大内存 7168 单个容器/调度器可申请的最大物理内存量
yarn.scheduler.minimum-allocation-vcores 每个Container请求的最小virtual CPU cores 1
yarn.scheduler.maximum-allocation-vcores 每个Container请求的最大virtual CPU cores 16
yarn.nodemanager.resource.memory-mb 限制 NodeManager 能够使用的最大物理内存 14336 该节点上YARN可使用的物理内存总量,【向操作系统申请的总量】默认是8192(MB)
yarn.nodemanager.resource.cpu-vcores 限制 NodeManager 能够使用的最大virtual CPU cores 16 该节点上YARN可使用的总核心数;一般设为cat /proc/cpuinfo| grep “processor”| wc -l 的值。默认是8个
yarn.log-aggregation-enable 启用日志聚集功能 true
yarn.nodemanager.log.retain-seconds 设置HDFS上日志的保存时间,默认设置为7天 10800
yarn.nodemanager.vmem-pmem-ratio 虚拟内存率 5 任务每使用1MB物理内存,最多可使用虚拟内存量比率,默认2.1;关闭虚拟内存限制的情况下,配置此项就无意义了

修改capacity-scheduler.xml

(flink yarn session启用的jobmanager占用的资源总量受此参数限制)

<property>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <value>0.3</value>
    <description>集群中可用于运行application master的资源比例上限.</description>
</property>

快速安装Hadoop

(使用此脚本安装完后需要单独修改capacity-scheduler.xml)

将安装脚本和安装包放在相同路径下并执行以下命令可快速完成上述配置步骤!

1
2
# 默认相关资源已放在当前用户的~路径下
sh ~/install-hadoop.sh

配置环境变量

1
2
3
4
5
6
7
vim ~/.bash_profile
加入以下内容(这里提前加上了flink的环境变量):
export FLINK_HOME = /data/apps/flink-1.7.1
export HADOOP_HOME=/data/apps/hadoop-2.8.5
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$FLINK_HOME/bin

启动Hadoop

格式化NameNode

1
hdfs namenode -format

在NameNode所在节点启动HDFS

1
start-dfs.sh

在ResourceManager所在节点启动YARN

1
start-yarn.sh

Flink集群

将Hadoop安装包解压至kafka01节点的/data/apps路径下

1
tar -zxvf ~/flink-1.7.1-bin-hadoop28-scala_2.11.tar.gz -C /data/apps

进入配置目录

1
cd /data/apps/flink-1.7.1/conf
1
2
3
4
5
6
7
8
9
10
11
jobmanager.rpc.address: flink01
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
parallelism.default: 2
taskmanager.numberOfTaskSlots: 8
state.backend: rocksdb
state.checkpoints.dir: hdfs://flink01:9000/flink-checkpoints
state.savepoints.dir: hdfs://flink01:9000/flink-savepoints
state.backend.incremental: true
io.tmp.dirs: /data/apps/flinkapp/tmp
yarn.application-attempts: 4

删除Flink原先使用的日志框架log4j相关资源

1
2
3
# 移除flink的lib目录下log4j及slf4j-log4j12的jar(如log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar);

# 移除flink的conf目录下log4j相关的配置文件(如log4j-cli.properties、log4j-console.properties、log4j.properties、log4j-yarn-session.properties)

更换Flink的日志框架为logback

(1)添加logback-classic.jar、logback-core.jar、log4j-over-slf4j.jar到flink的lib目录下

(2)自定义logback的配置,覆盖flink的conf目录下的logback.xml、logback-console.xml、logback-yarn.xml

1
2
3
# 使用flink-daemon.sh启动的flink使用的logback配置文件是logback.xml;
# 使用flink-console.sh启动的flink使用的logback配置文件是logback-console.xml;
# 使用yarn-session.sh启动的flink使用的logback配置文件是logback-yarn.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!--定义日志文件的存储目录,勿使用相对路径-->
<property name="LOG_HOME" value="/data/apps/flinkapp/logs"/>

<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符-->
<property name="pattern" value="%d{yyyyMMdd:HH:mm:ss.SSS} [%thread] %-5level %msg%n"/>

<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>-->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!-- INFO_FILE -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/info/info.log</file>
<!--只输出INFO-->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/info/info_%d{yyyy-MM-dd}.log.%i.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>50MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--<maxHistory>30</maxHistory>-->
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!-- ERROR_FILE -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/error/error.log</file>
<!--只输出ERROR-->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/error/error_%d{yyyy-MM-dd}.log.%i.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>50MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--<maxHistory>30</maxHistory>-->
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<logger name="com.haier.flink" level="DEBUG" additivity="true">
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</logger>

<logger name="java.sql.Connection" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="java.sql.Statement" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="java.sql.PreparedStatement" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<!--根logger-->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

</configuration>

​ 这种方式需要先启动集群,然后在提交Flink-Job(同一个Session中可以提交多个Flink-Job,可以在Flink的WebUI上submit,也可以使用Flink run命令提交)。启动集群时会向yarn申请一块空间,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成,释放了资源,那下一个作业才会正常提交.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 默认配置启动flink on yarn(默认启动资源如下)
# {masterMemoryMB=1024, taskManagerMemoryMB=1024,numberTaskManagers=1, slotsPerTaskManager=1}
yarn-session.sh

############## 系统默认使用con/flink-conf.yaml里的配置,Flink on yarn将会覆盖掉几个参数:
# jobmanager.rpc.address因为jobmanager的在集群的运行位置并不是事先确定的,其实就是AM的地址;
# taskmanager.tmp.dirs使用yarn给定的临时目录;
# parallelism.default也会被覆盖掉,如果在命令行里指定了slot数。

############## 自定义配置可选参数如下
# Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
# Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode

# 示例:启动15个TaskManager,1个JobManager,JobManager内存1024M,每个TaskManager内存1024M且含有8个slot,自定义该应用的名称为FlinkOnYarnSession,-d以分离式模式执行(不指定-d则以客户端模式执行)
yarn-session.sh -n 15 -jm 1024 -tm 1024 -s 8 -nm FlinkOnYarnSession -d

# 客户端模式指的是在终端启动一个客户端,这种方式是不能断开终端的,断开即相当于kill掉Flink集群
# 分离式模式指的是启动Flink on Yarn后,Flink YARN客户端将仅向Yarn提交Flink,然后自行关闭。,要kill掉Flink集群需要使用如下命令:
yarn application -kill <appId>
# <appId>指的是发布在Yarn上的作业ID,在Yarn集群上可以查到对应的ID

# 对于Flink On Yarn来说,一个JobManager占用一个Container,一个TaskManager占用一个Container
# JobManager的数量+TaskManager的数量 = 申请的Container的数量
# 以下以6台16核,16G内存的机器举例说明(每台节点预留2G内存给系统)
yarn.nodemanager.resource.cpu-vcores=16 每台NodeManager节点为YARN集群分配的cpu为16核
yarn.nodemanager.resource.memory-mb=14336 每台NodeManager节点为YARN集群分配的物理内存为14G
yarn.scheduler.minimum-allocation-vcores=1 每台NodeManager节点上每个Contaniner最小使用1核cpu
yarn.scheduler.minimum-allocation-mb=1024 每台NodeManager节点上每个Contaniner最小使用1G的物理内存
# 若所有节点全部用于Flink作业,推荐提供的Flink集群:
(总的资源为14*6=84G内存,16*6=96核)
yarn-session.sh -n 8 -jm 4096 -tm 3584 -s 16 -nm FlinkOnYarnSession -d
一共占用32G内存,9cores,申请了1个4G/1cores的JobManager和8个3.5G/1cores/16slots的TaskManager

############### Recovery behavior of Flink on YARN
# Flink’s YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the conf/flink-conf.yaml or when starting the YARN session, using -D parameters

# yarn.reallocate-failed : 控制 Flink是否应该重新分配失败的TaskManager容器,默认true
# yarn.maximum-failed-containers : ApplicationMaster接收container失败的最大次数,默认是TaskManager的次数(-n的值)
# yarn.application-attempts : ApplicationMaster尝试次数。如果这个值为1(默认),那么当Application Master失败时,整个YARN session就会失败。更高的值是指ApplicationMaster重新启动的次数

这种方式不需要先启动集群,每提交一个Flink-Job都会在Yarn上启动一个Flink集群。

1
2
3
4
5
6
7
8
# TaskManager slots number配置
这个参数是配置一个TaskManager有多少个并发的slot数。有两种配置方式:
- taskmanager.numberOfTaskSlots. 在conf/flink-conf.yaml中更改,默认值为1,表示默认一个TaskManager只有1个task slot.
- 提交作业时通过参数配置。--yarnslots 1,表示TaskManager的slot数为1.

# TaskManager的个数
注意: Per job模式提交作业时并不像session模式能够指定拉起多少个TaskManager,TaskManager的数量是在提交作业时根据并发度动态计算。
首先,根据设定的operator的最大并发度计算,例如,如果作业中operator的最大并发度为10,则 Parallelism/numberOfTaskSlots为向YARN申请的TaskManager数。
1
2
3
4
5
6
7
#######################示例
# flink run -m yarn-cluster 必须指定
# -d 分离模式启动(不指定则以客户端模式启动)
# 启动1个JobManager,内存占用1024M
# 每台TaskManager指定4个slot、内存占用1024M
# 假设abc.jar所有operator中最大并发度为8,则会启动8/4=2台TaskManager
flink run -m yarn-cluster -d --yarnslots 4 -yjm 1024 -ytm 1024 /data/abc.jar

Log Files

In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the YARN log aggregation. To enable it, users have to set the yarn.log-aggregation-enableproperty to true in the yarn-site.xml file. Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.

1
yarn logs -applicationId <application ID>

Note that it takes a few seconds after the session has finished until the logs show up.