Flink1.14多种模式安装部署 |
您所在的位置:网站首页 › uber客户端下载 › Flink1.14多种模式安装部署 |
Flink支持多种安装模式 - Local—本地单机模式,学习测试时使用 - Standalone—独立集群模式,Flink自带集群,开发测试环境使用 - StandaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用 - On Yarn—计算资源统一由Hadoop YARN管理,生产环境使用 1、Local本地模式 1.1 原理1.下载安装包 Index of /dist/flink 2.上传flink-1.14.0-bin-scala_2.11.tgz到node1的指定目录 https://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgz 4.如果出现权限问题,需要修改权限 chown -R root:root flink-1.14.05.配置环境变量 export JAVA_HOME=/data/soft/jdk1.8.0_201 export FLINK_HOME=/data/soft/flink-1.14.0 export PATH=$FLINK_HOME/bin:$JAVA_HOME/bin:$PATH6.启动flink以local本地方式 [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node1. Starting taskexecutor daemon on host node1. [root@node1 flink-1.14.0]# jps 29090 NameNode 29286 DataNode 24182 Jps 23785 StandaloneSessionClusterEntrypoint 24106 TaskManagerRunner 29614 SecondaryNameNodeflink默认的web ui界面的端口为8081!!! http://192.168.56.15:8081/#/overview slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。 7.执行官方示例 [root@common soft]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar --input /data/soft/words.txt --output /data/soft/out.txt Job has been submitted with JobID 3a0f1f4c39da135aa6f994c78942a33b Program execution finished Job with JobID 3a0f1f4c39da135aa6f994c78942a33b has finished. Job Runtime: 1976 ms vim /data/soft/words.txt hello me you her hello me you hello me hello
- 服务器: node1(Master + Slave): JobManager + TaskManager - 服务器: node2(Slave): TaskManager - 服务器: node3(Slave): TaskManager 需要hadoop环境,则要安装hadoop和yarn!!! 安装教程如下: Hadoop3.3.1最新版本安装分布式集群部署_wtl1992的博客-CSDN博客集群规划:机器ip分配节点node1192.168.56.10NameNode、SecondaryNameNode、DataNode、ResourceManagernode2192.168.56.11DataNode、NodeManagernode3192.168.56.12DataNode、NodeManager1、首先从Apache Hadoophttp://hadoop.apache.org/下载最新版.. vim /etc/profile export HADOOP_CONF_DIR=/data/soft/hadoop-3.3.1/etc/hadoop 6.分发 rsync -av flink-1.14.0 root@node2:/data/soft rsync -av flink-1.14.0 root@node3:/data/soft rsync -av /etc/profile root@node2:/etc/ rsync -av /etc/profile root@node3:/etc/然后在node2、node3上执行: source /etc/profile 2.3.测试 1.启动集群,在node1上执行如下命令 [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node1. Starting taskexecutor daemon on host node1. Starting taskexecutor daemon on host node2. Starting taskexecutor daemon on host node3. [root@node1 flink-1.14.0]# jps 29090 NameNode 1045 StandaloneSessionClusterEntrypoint 29286 DataNode 1465 TaskManagerRunner 29614 SecondaryNameNode 1583 Jps或者单独启动 /data/soft/flink-1.14.0/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all /data/soft/flink-1.14.0/bin/taskmanager.sh start|start-foreground|stop|stop-all [root@node1 flink-1.14.0]# jps 14336 Jps 29090 NameNode 12756 StandaloneSessionClusterEntrypoint 29286 DataNode 13111 TaskManagerRunner 29614 SecondaryNameNode 2.启动历史服务器 /data/soft/flink-1.14.0/bin/historyserver.sh start启动日志里会报错: 报错信息如下: Hadoop is not in the classp ath/dependencies. The extended set of supported File Systems via Hadoop is not available. 解决办法: 下载如下jar包: flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道. 放到$FLINK_HOME/lib下面即可!!! http://192.168.56.10:8081/#/overview http://192.168.56.10:8082/#/overview TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少 3、Standalone-HA高可用集群模式 3.1 原理从之前的架构中我们可以很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。 在 Zookeeper 的帮助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。 3.2 操作 1.集群规划- 服务器: node1(Master + Slave): JobManager + TaskManager - 服务器: node2(Master + Slave): JobManager + TaskManager - 服务器: node3(Slave): TaskManager 2.启动ZooKeeper安装教程参照如下: Zookeeper集群的安装_wtl1992的博客-CSDN博客下载Releases · vran-dev/PrettyZoo · GitHubhttps://github.com/vran-dev/PrettyZoo/releases 增加如下内容: state.backend: filesystem state.backend.fs.checkpointdir: hdfs://node1:9000/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://node1:9000/flink/ha/ high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
9.重新启动Flink集群,node1上执行 [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh No taskexecutor daemon (pid: 24106) is running anymore on node1. No taskexecutor daemon (pid: 26122) is running anymore on node2. No taskexecutor daemon (pid: 26548) is running anymore on node3. No standalonesession daemon (pid: 23785) is running anymore on node1. No standalonesession daemon to stop on host node2. [root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting HA cluster with 2 masters. Starting standalonesession daemon on host node1. Starting standalonesession daemon on host node2. Starting taskexecutor daemon on host node1. Starting taskexecutor daemon on host node2. Starting taskexecutor daemon on host node3.
注意: 使用jps命令查看 有可能发现没有Flink相关进程被启动!!! 因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar。 解决办法: 下载如下jar包: flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道. 放到$FLINK_HOME/lib下面即可!!! 然后分发到node2、node3即可。 3.3 测试 1.访问WebUIhttp://192.168.56.10:8081/#/job-manager/config http://192.168.56.11:8081/#/job-manager/config
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下: -1.Yarn的资源可以按需使用,提高集群的资源利用率 -2.Yarn的任务有优先级,根据优先级运行作业 -3.基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错) ○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控 ○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器 ○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager 4.1.2 Flink如何和Yarn进行交互?1.Client上传jar包和配置文件到HDFS集群上 2.Client向Yarn ResourceManager提交任务并申请资源 3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager JobManager和ApplicationMaster运行在同一个container上。 一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。 它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。 这个配置文件也被上传到HDFS上。 此外,AppMaster容器也提供了Flink的web服务接口。 YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink 4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager 5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务 4.1.3 两种方式(1)Session模式 特点:需要事先申请资源,启动JobManager和TaskManger 优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率 缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源 应用场景:适合作业递交比较频繁的场景,小作业比较多的场景 (2)Per-Job模式 特点:每次递交作业都需要申请一次资源 优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源 缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间 应用场景:适合作业比较少的场景、大作业的场景 4.2 操作 1.关闭yarn的内存检查 vim /data/soft/hadoop-3.3.1/etc/hadoop/yarn-site.xml添加: yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false 说明:是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job 2.同步 rsync -av /data/soft/hadoop-3.3.1 root@node2:/data/soft rsync -av /data/soft/hadoop-3.3.1 root@node3:/data/soft 3.重启yarn [root@node1 hadoop-3.3.1]# /data/soft/hadoop-3.3.1/sbin/stop-yarn.sh Stopping nodemanagers 上一次登录:六 11月 13 16:10:02 EST 2021从 192.168.56.1pts/1 上 Stopping resourcemanager 上一次登录:六 11月 13 16:46:03 EST 2021pts/0 上 [root@node1 hadoop-3.3.1]# /data/soft/hadoop-3.3.1/sbin/start-yarn.sh Starting resourcemanager 上一次登录:六 11月 13 16:46:07 EST 2021pts/0 上 Starting nodemanagers 上一次登录:六 11月 13 16:46:44 EST 2021pts/0 上
yarn-session.sh(开辟资源) + flink run(提交任务) 1.在yarn上启动一个Flink会话,node1上执行以下命令 /data/soft/flink-1.14.0/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d说明: 申请2个CPU、1600M内存 # -n 表示申请2个容器,这里指的就是多少个taskmanager # -tm 表示每个TaskManager的内存大小 # -s 表示每个TaskManager的slots数量 # -d 表示以后台程序方式运行 注意: 最后一个 block 没有足够的副本数,文件关闭失败 堆栈信息: java.io.IOException: Unable to close file because the last block does not have enough number of replicas. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2528) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2495) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2458) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)解决办法: vim /data/soft/hadoop-3.3.1/etc/hadoop/core-site.xmlNameNode 收到 HDFS 客户端关闭文件请求之后,会检查最后一个 block 的完成状态,只有当足够的 DataNode 上报 block 完成才可关闭文件。 网络 IO 延迟和 GC 等原因都将导致 DataNode 延迟上报。 HDFS 客户端会多次尝试关闭文件,通过增加重试次数可减少该问题。 添加如下内容: dfs.client.block.write.locateFollowingBlock.retries 10该警告不用管 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception java.lang.InterruptedException 4.3.2 Per-Job分离模式1.直接提交job /data/soft/flink-1.14.0/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /data/soft/flink-1.14.0/examples/batch/WordCount.jar# -m jobmanager的地址 # -yjm 1024 指定jobmanager的内存信息 # -ytm 1024 指定taskmanager的内存信息 注意: 最后一个 block 没有足够的副本数,文件关闭失败 堆栈信息: java.io.IOException: Unable to close file because the last block does not have enough number of replicas. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2528) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2495) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2458) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)解决办法: vim /data/soft/hadoop-3.3.1/etc/hadoop/core-site.xmlNameNode 收到 HDFS 客户端关闭文件请求之后,会检查最后一个 block 的完成状态,只有当足够的 DataNode 上报 block 完成才可关闭文件。 网络 IO 延迟和 GC 等原因都将导致 DataNode 延迟上报。 HDFS 客户端会多次尝试关闭文件,通过增加重试次数可减少该问题。 添加如下内容: dfs.client.block.write.locateFollowingBlock.retries 10该警告不用管 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception java.lang.InterruptedException 解决办法: 下载如下jar包: flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道. 放到$FLINK_HOME/lib下面即可!!! 跑完任务会报警告,警告如下: Exception in thread “Thread-8” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’. 解决方案: 在flink-conf.yaml中添加: classloader.check-leaked-classloader: false
|
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |