Flink1.14多种模式安装部署

您所在的位置:网站首页 uber客户端下载 Flink1.14多种模式安装部署

Flink1.14多种模式安装部署

2024-01-28 07:00| 来源: 网络整理| 查看: 265

 Flink支持多种安装模式

- Local—本地单机模式,学习测试时使用

- Standalone—独立集群模式,Flink自带集群,开发测试环境使用

- StandaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用

- On Yarn—计算资源统一由Hadoop YARN管理,生产环境使用

1、Local本地模式 1.1 原理

Flink程序由JobClient进行提交JobClient将作业提交给JobManagerJobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManagerTaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。作业执行完成后,结果将发送回客户端(JobClient) 1.2 操作

1.下载安装包

Index of /dist/flinkhttps://archive.apache.org/dist/flink/

2.上传flink-1.14.0-bin-scala_2.11.tgz到node1的指定目录

https://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgzhttps://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgz3.解压

tar -zxvf flink-1.14.0-bin-scala_2.11.tgz

4.如果出现权限问题,需要修改权限

chown -R root:root flink-1.14.0

5.配置环境变量

export JAVA_HOME=/data/soft/jdk1.8.0_201 export FLINK_HOME=/data/soft/flink-1.14.0 export PATH=$FLINK_HOME/bin:$JAVA_HOME/bin:$PATH

6.启动flink以local本地方式

[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node1. Starting taskexecutor daemon on host node1. [root@node1 flink-1.14.0]# jps 29090 NameNode 29286 DataNode 24182 Jps 23785 StandaloneSessionClusterEntrypoint 24106 TaskManagerRunner 29614 SecondaryNameNode

flink默认的web ui界面的端口为8081!!! 

http://192.168.56.15:8081/#/overview

slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。

7.执行官方示例

[root@common soft]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar --input /data/soft/words.txt --output /data/soft/out.txt Job has been submitted with JobID 3a0f1f4c39da135aa6f994c78942a33b Program execution finished Job with JobID 3a0f1f4c39da135aa6f994c78942a33b has finished. Job Runtime: 1976 ms vim /data/soft/words.txt hello me you her hello me you hello me hello

 

 

 2、Standalone独立集群模式 2.1 原理

client客户端提交任务给JobManagerJobManager负责申请任务运行所需要的资源并管理任务和资源,JobManager分发任务给TaskManager执行TaskManager定期向JobManager汇报状态 2.2 操作 1.集群规划:

- 服务器: node1(Master + Slave): JobManager + TaskManager

- 服务器: node2(Slave): TaskManager

- 服务器: node3(Slave): TaskManager

需要hadoop环境,则要安装hadoop和yarn!!!

安装教程如下:

Hadoop3.3.1最新版本安装分布式集群部署_wtl1992的博客-CSDN博客集群规划:机器ip分配节点node1192.168.56.10NameNode、SecondaryNameNode、DataNode、ResourceManagernode2192.168.56.11DataNode、NodeManagernode3192.168.56.12DataNode、NodeManager1、首先从Apache Hadoophttp://hadoop.apache.org/下载最新版..https://blog.csdn.net/wtl1992/article/details/121441070

2.修改flink-conf.yaml vim /data/soft/flink-1.14.0/conf/flink-conf.yaml jobmanager.rpc.address: node1 taskmanager.numberOfTaskSlots: 4 web.submit.enable: true #历史服务器 jobmanager.archive.fs.dir: hdfs://node1:9000/flink/completed-jobs/ historyserver.web.address: node1 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://node1:9000/flink/completed-jobs/

3.修改masters vim /data/soft/flink-1.14.0/conf/masters node1:8081 4.修改slaves vim /data/soft/flink-1.14.0/conf/workers node1 node2 node3  5.添加HADOOP_CONF_DIR环境变量

vim /etc/profile

export HADOOP_CONF_DIR=/data/soft/hadoop-3.3.1/etc/hadoop 6.分发 rsync -av flink-1.14.0 root@node2:/data/soft rsync -av flink-1.14.0 root@node3:/data/soft rsync -av /etc/profile root@node2:/etc/ rsync -av /etc/profile root@node3:/etc/

然后在node2、node3上执行:

source /etc/profile 2.3.测试 1.启动集群,在node1上执行如下命令 [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node1. Starting taskexecutor daemon on host node1. Starting taskexecutor daemon on host node2. Starting taskexecutor daemon on host node3. [root@node1 flink-1.14.0]# jps 29090 NameNode 1045 StandaloneSessionClusterEntrypoint 29286 DataNode 1465 TaskManagerRunner 29614 SecondaryNameNode 1583 Jps

或者单独启动

/data/soft/flink-1.14.0/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all /data/soft/flink-1.14.0/bin/taskmanager.sh start|start-foreground|stop|stop-all [root@node1 flink-1.14.0]# jps 14336 Jps 29090 NameNode 12756 StandaloneSessionClusterEntrypoint 29286 DataNode 13111 TaskManagerRunner 29614 SecondaryNameNode 2.启动历史服务器 /data/soft/flink-1.14.0/bin/historyserver.sh start

 启动日志里会报错:

报错信息如下:

Hadoop is not in the classp ath/dependencies. The extended set of supported File Systems via Hadoop is not available.

解决办法: 

下载如下jar包: 

flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/wtl1992/41702901

 放到$FLINK_HOME/lib下面即可!!!

3.访问Flink UI界面或使用jps查看

http://192.168.56.10:8081/#/overview

http://192.168.56.10:8082/#/overview

TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少

3、Standalone-HA高可用集群模式 3.1 原理

从之前的架构中我们可以很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。

在 Zookeeper 的帮助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。

3.2 操作 1.集群规划

- 服务器: node1(Master + Slave): JobManager + TaskManager

- 服务器: node2(Master + Slave): JobManager + TaskManager

- 服务器: node3(Slave): TaskManager

2.启动ZooKeeper

安装教程参照如下:

Zookeeper集群的安装_wtl1992的博客-CSDN博客下载Releases · vran-dev/PrettyZoo · GitHubhttps://github.com/vran-dev/PrettyZoo/releaseshttps://blog.csdn.net/wtl1992/article/details/120392696?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163680284816780264046740%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=163680284816780264046740&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_v29-3-120392696.pc_v2_rank_blog_default&utm_term=zookeeper&spm=1018.2226.3001.4450

3.启动HDFS /data/soft/hadoop-3.3.1/sbin/start-dfs.sh 4.停止Flink集群 [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 1465) on host node1. Stopping taskexecutor daemon (pid: 26295) on host node2. Stopping taskexecutor daemon (pid: 26675) on host node3. Stopping standalonesession daemon (pid: 1045) on host node1. 5.修改flink-conf.yaml vim /data/soft/flink-1.14.0/conf/flink-conf.yaml

增加如下内容:

state.backend: filesystem state.backend.fs.checkpointdir: hdfs://node1:9000/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://node1:9000/flink/ha/ high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

 配置解释:

#开启HA,使用文件系统作为快照存储 state.backend: filesystem #启用检查点,可以将快照保存到HDFS state.backend.fs.checkpointdir: hdfs://node1:9000/flink-checkpoints #使用zookeeper搭建高可用 high-availability: zookeeper # 存储JobManager的元数据到HDFS high-availability.storageDir: hdfs://node1:9000/flink/ha/ # 配置ZK集群地址 high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181  6.修改masters vim /data/soft/flink-1.14.0/conf/masters node1:8081 node2:8081 7.同步 rsync -av /data/soft/flink-1.14.0 root@node2:/data/soft rsync -av /data/soft/flink-1.14.0 root@node3:/data/soft 8.修改node2上的flink-conf.yaml vim /data/soft/flink-1.14.0/conf/flink-conf.yaml jobmanager.rpc.address: node2

9.重新启动Flink集群,node1上执行

[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh No taskexecutor daemon (pid: 24106) is running anymore on node1. No taskexecutor daemon (pid: 26122) is running anymore on node2. No taskexecutor daemon (pid: 26548) is running anymore on node3. No standalonesession daemon (pid: 23785) is running anymore on node1. No standalonesession daemon to stop on host node2. [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting HA cluster with 2 masters. Starting standalonesession daemon on host node1. Starting standalonesession daemon on host node2. Starting taskexecutor daemon on host node1. Starting taskexecutor daemon on host node2. Starting taskexecutor daemon on host node3.

 

注意:

使用jps命令查看

有可能发现没有Flink相关进程被启动!!!

因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar。

 解决办法: 

下载如下jar包: 

flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/wtl1992/41702901

 放到$FLINK_HOME/lib下面即可!!!

然后分发到node2、node3即可。

 3.3 测试 1.访问WebUI

http://192.168.56.10:8081/#/job-manager/config

http://192.168.56.11:8081/#/job-manager/config

2.执行wc [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 1eb0896a4ba2ba995dc88ece3dcd8fc5 Program execution finished Job with JobID 1eb0896a4ba2ba995dc88ece3dcd8fc5 has finished. Job Runtime: 1815 ms Accumulator Results: - 98d0871f23ab6fc907e47dae6d036868 (java.util.ArrayList) [170 elements] (a,5) (action,1) (after,1) (against,1) (all,2) (and,12) (arms,1) (arrows,1) (awry,1) (ay,1) (bare,1) (be,4) (bear,3) (bodkin,1) (bourn,1) (but,1) (by,2) (calamity,1) (cast,1) (coil,1) (come,1) (conscience,1) (consummation,1) (contumely,1) (country,1) (cowards,1) (currents,1) (d,4) (death,2) (delay,1) (despis,1) (devoutly,1) (die,2) (does,1) (dread,1) (dream,1) (dreams,1) (end,2) (enterprises,1) (er,1) (fair,1) (fardels,1) (flesh,1) (fly,1) (for,2) (fortune,1) (from,1) (give,1) (great,1) (grunt,1) (have,2) (he,1) (heartache,1) (heir,1) (himself,1) (his,1) (hue,1) (ills,1) (in,3) (insolence,1) (is,3) (know,1) (law,1) (life,2) (long,1) (lose,1) (love,1) (make,2) (makes,2) (man,1) (may,1) (merit,1) (might,1) (mind,1) (moment,1) (more,1) (mortal,1) (must,1) (my,1) (name,1) (native,1) (natural,1) (no,2) (nobler,1) (not,2) (now,1) (nymph,1) (o,1) (of,15) (off,1) (office,1) (ophelia,1) (opposing,1) (oppressor,1) (or,2) (orisons,1) (others,1) (outrageous,1) (pale,1) (pangs,1) (patient,1) (pause,1) (perchance,1) (pith,1) (proud,1) (puzzles,1) (question,1) (quietus,1) (rather,1) (regard,1) (remember,1) (resolution,1) (respect,1) (returns,1) (rub,1) (s,5) (say,1) (scorns,1) (sea,1) (shocks,1) (shuffled,1) (sicklied,1) (sins,1) (sleep,5) (slings,1) (so,1) (soft,1) (something,1) (spurns,1) (suffer,1) (sweat,1) (take,1) (takes,1) (than,1) (that,7) (the,22) (their,1) (them,1) (there,2) (these,1) (this,2) (those,1) (thought,1) (thousand,1) (thus,2) (thy,1) (time,1) (tis,2) (to,15) (traveller,1) (troubles,1) (turn,1) (under,1) (undiscover,1) (unworthy,1) (us,3) (we,4) (weary,1) (what,1) (when,2) (whether,1) (whips,1) (who,2) (whose,1) (will,1) (w

  

3.kill掉其中一个master [root@node1 flink-1.14.0]# jps 25584 TaskManagerRunner 29090 NameNode 6261 HistoryServer 12485 QuorumPeerMain 11973 Jps 29286 DataNode 25179 StandaloneSessionClusterEntrypoint 29614 SecondaryNameNode [root@node1 flink-1.14.0]# kill -kill 25179 [root@node1 flink-1.14.0]# jps 25584 TaskManagerRunner 29090 NameNode 6261 HistoryServer 12485 QuorumPeerMain 29286 DataNode 12397 Jps 29614 SecondaryNameNode 4.重新执行wc,还是可以正常执行 [root@node2 flink-1.14.0]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 73cad18e17d962e26ce57aed3b0bdc86 Program execution finished Job with JobID 73cad18e17d962e26ce57aed3b0bdc86 has finished. Job Runtime: 860 ms Accumulator Results: - 7ce63948462d538a891e144b030e5724 (java.util.ArrayList) [170 elements] (a,5) (action,1) (after,1) (against,1) (all,2) (and,12) (arms,1) (arrows,1) (awry,1) (ay,1) (bare,1) (be,4) (bear,3) (bodkin,1) (bourn,1) (but,1) (by,2) (calamity,1) (cast,1) (coil,1) (come,1) (conscience,1) (consummation,1) (contumely,1) (country,1) (cowards,1) (currents,1) (d,4) (death,2) (delay,1) (despis,1) (devoutly,1) (die,2) (does,1) (dread,1) (dream,1) (dreams,1) (end,2) (enterprises,1) (er,1) (fair,1) (fardels,1) (flesh,1) (fly,1) (for,2) (fortune,1) (from,1) (give,1) (great,1) (grunt,1) (have,2) (he,1) (heartache,1) (heir,1) (himself,1) (his,1) (hue,1) (ills,1) (in,3) (insolence,1) (is,3) (know,1) (law,1) (life,2) (long,1) (lose,1) (love,1) (make,2) (makes,2) (man,1) (may,1) (merit,1) (might,1) (mind,1) (moment,1) (more,1) (mortal,1) (must,1) (my,1) (name,1) (native,1) (natural,1) (no,2) (nobler,1) (not,2) (now,1) (nymph,1) (o,1) (of,15) (off,1) (office,1) (ophelia,1) (opposing,1) (oppressor,1) (or,2) (orisons,1) (others,1) (outrageous,1) (pale,1) (pangs,1) (patient,1) (pause,1) (perchance,1) (pith,1) (proud,1) (puzzles,1) (question,1) (quietus,1) (rather,1) (regard,1) (remember,1) (resolution,1) (respect,1) (returns,1) (rub,1) (s,5) (say,1) (scorns,1) (sea,1) (shocks,1) (shuffled,1) (sicklied,1) (sins,1) (sleep,5) (slings,1) (so,1) (soft,1) (something,1) (spurns,1) (suffer,1) (sweat,1) (take,1) (takes,1) (than,1) (that,7) (the,22) (their,1) (them,1) (there,2) (these,1) (this,2) (those,1) (thought,1) (thousand,1) (thus,2) (thy,1) (time,1) (tis,2) (to,15) (traveller,1) (troubles,1) (turn,1) (under,1) (undiscover,1) (unworthy,1) (us,3) (we,4) (weary,1) (what,1) (when,2) (whether,1) (whips,1) (who,2) (whose,1) (will,1) (wish,1) (

 5.停止集群 [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 25584) on host node1. Stopping taskexecutor daemon (pid: 15256) on host node2. Stopping taskexecutor daemon (pid: 15288) on host node3. No standalonesession daemon (pid: 25179) is running anymore on node1. Stopping standalonesession daemon (pid: 14795) on host node2. 4、Flink On Yarn模式 4.1 原理 4.1.1 为什么使用Flink On Yarn?

在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:

-1.Yarn的资源可以按需使用,提高集群的资源利用率

-2.Yarn的任务有优先级,根据优先级运行作业

-3.基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控

○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器

○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

4.1.2 Flink如何和Yarn进行交互?

1.Client上传jar包和配置文件到HDFS集群上

2.Client向Yarn ResourceManager提交任务并申请资源

3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager

JobManager和ApplicationMaster运行在同一个container上。

一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。

它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。

这个配置文件也被上传到HDFS上。

此外,AppMaster容器也提供了Flink的web服务接口。

YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink

4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager

5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

4.1.3 两种方式

(1)Session模式

特点:需要事先申请资源,启动JobManager和TaskManger

优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率

缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源

应用场景:适合作业递交比较频繁的场景,小作业比较多的场景

 (2)Per-Job模式

特点:每次递交作业都需要申请一次资源

优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源

缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间

应用场景:适合作业比较少的场景、大作业的场景

4.2 操作 1.关闭yarn的内存检查 vim /data/soft/hadoop-3.3.1/etc/hadoop/yarn-site.xml

添加: 

yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false  说明:

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job

2.同步 rsync -av /data/soft/hadoop-3.3.1 root@node2:/data/soft rsync -av /data/soft/hadoop-3.3.1 root@node3:/data/soft 3.重启yarn [root@node1 hadoop-3.3.1]# /data/soft/hadoop-3.3.1/sbin/stop-yarn.sh Stopping nodemanagers 上一次登录:六 11月 13 16:10:02 EST 2021从 192.168.56.1pts/1 上 Stopping resourcemanager 上一次登录:六 11月 13 16:46:03 EST 2021pts/0 上 [root@node1 hadoop-3.3.1]# /data/soft/hadoop-3.3.1/sbin/start-yarn.sh Starting resourcemanager 上一次登录:六 11月 13 16:46:07 EST 2021pts/0 上 Starting nodemanagers 上一次登录:六 11月 13 16:46:44 EST 2021pts/0 上

 

 4.3 测试 4.3.1 Session模式

yarn-session.sh(开辟资源) + flink run(提交任务)

1.在yarn上启动一个Flink会话,node1上执行以下命令

/data/soft/flink-1.14.0/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

说明:

申请2个CPU、1600M内存

# -n 表示申请2个容器,这里指的就是多少个taskmanager

# -tm 表示每个TaskManager的内存大小

# -s 表示每个TaskManager的slots数量

# -d 表示以后台程序方式运行

注意:

最后一个 block 没有足够的副本数,文件关闭失败

堆栈信息:

java.io.IOException: Unable to close file because the last block does not have enough number of replicas. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2528) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2495) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2458) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

解决办法:

vim /data/soft/hadoop-3.3.1/etc/hadoop/core-site.xml

NameNode 收到 HDFS 客户端关闭文件请求之后,会检查最后一个 block 的完成状态,只有当足够的 DataNode 上报 block 完成才可关闭文件。

网络 IO 延迟和 GC 等原因都将导致 DataNode 延迟上报。

HDFS 客户端会多次尝试关闭文件,通过增加重试次数可减少该问题。

 添加如下内容:

dfs.client.block.write.locateFollowingBlock.retries 10

该警告不用管

WARN  org.apache.hadoop.hdfs.DFSClient  - Caught exception  java.lang.InterruptedException

 4.3.2 Per-Job分离模式

1.直接提交job

/data/soft/flink-1.14.0/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /data/soft/flink-1.14.0/examples/batch/WordCount.jar

# -m  jobmanager的地址

# -yjm 1024 指定jobmanager的内存信息

# -ytm 1024 指定taskmanager的内存信息

注意:

最后一个 block 没有足够的副本数,文件关闭失败

堆栈信息:

java.io.IOException: Unable to close file because the last block does not have enough number of replicas. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2528) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2495) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2458) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

解决办法:

vim /data/soft/hadoop-3.3.1/etc/hadoop/core-site.xml

NameNode 收到 HDFS 客户端关闭文件请求之后,会检查最后一个 block 的完成状态,只有当足够的 DataNode 上报 block 完成才可关闭文件。

网络 IO 延迟和 GC 等原因都将导致 DataNode 延迟上报。

HDFS 客户端会多次尝试关闭文件,通过增加重试次数可减少该问题。

 添加如下内容:

dfs.client.block.write.locateFollowingBlock.retries 10

该警告不用管

WARN  org.apache.hadoop.hdfs.DFSClient  - Caught exception  java.lang.InterruptedException

 解决办法: 

下载如下jar包: 

flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/wtl1992/41702901

 放到$FLINK_HOME/lib下面即可!!!

跑完任务会报警告,警告如下:

Exception in thread “Thread-8” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.

解决方案:

在flink-conf.yaml中添加:

classloader.check-leaked-classloader: false

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3