快速入门Flink(2) |
您所在的位置:网站首页 › pytorchyolov3环境搭建 › 快速入门Flink(2) |
node01(master+slave)、node02(slave) 、node03(slave) 1.3搭建集群实现步骤 解压 Flink 压缩包到指定目录配置 Flink配置 Slaves 节点分发 Flink 到各个节点启动集群递交 wordcount 程序测试查看 Flink WebUI 二、standalone模式环境搭建 将下载好的Flink安装包上传到指定目录解压Flink到 /export/server 目录 tar -zxvf flink-1.7.2-bin-hadoop26-scala_2.11.tgz 修改安装目录下 conf 文件夹内的 flink-conf.yaml 配置文件, 指定 JobManager (指定管理者) # 配置 Master 的机器名( IP 地址) node01 = 192.168.100.201 jobmanager.rpc.address: node01 # 配置每个 taskmanager 生成的临时文件夹 taskmanager.tmp.dirs: /export/servers/flink-1.7.2/tmp 修改安装目录下 conf 文件夹内的 slave 配置文件, 指定 TaskManager(指定工作节点) # node01 = 192.168.100.201、 node02 = 192.168.100.202、 node03 = 192.168.100.203 node01 node02 node03 配置系统环境变量配置文件(vim /etc/profile.d/flink.sh),添加 HADOOP_CONF_DIR 目录 export HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop 将flink.sh这个文件发送到其他的节点上 scp -r /etc/profile.d/flink.sh node02:/etc/profile.d/ scp -r /etc/profile.d/flink.sh node03:/etc/profile.d/ 每个节点重新加载环境变量 source /etc/profile 将配置好的 Flink 目录分发给其他的两台节点 for i in {2..3}; do scp -r flink-1.7.2/ node0$i:$PWD; done 启动 Flink 集群 bin/start-cluster.sh 通过 jps 查看进程信息 --------------------- node01 ---------------- 86583 Jps 85963 StandaloneSessionClusterEntrypoi nt 86446 TaskManagerRunner --------------------- node02 ---------------- 44099 Jps 43819 TaskManagerRunner --------------------- node03 ---------------- 29461 TaskManagerRunner 29678 Jps 通过Flink自带的webUI来查看![]() ![]() 从上述架构图中, 可发现 JobManager 存在单点故障, 一旦 JobManager 出现意外, 整 个集群无法工作。 所以, 为了确保集群的高可用, 需要搭建 Flink 的 HA。 ( 如果是 部署在 YARN 上, 部署 YARN 的 HA) , 我们这里演示如何搭建 Standalone 模式 HA。 2.3 HA 架构图node01(master+slave) node02(master+slave) node03(slave) 2.5实现步骤 在 flink-conf.yaml 中添加 zookeeper 配置将配置过的 HA 的 flink-conf.yaml 分发到另外两个节点分别到另外两个节点中修改 flink-conf.yaml 中的配置在 masters 配置文件中添加多个节点分发 masters 配置文件到另外两个节点启动 zookeeper 集群启动 flink 集群 2.6具体操作 在 flink-conf.yaml 中添加 zookeeper 配置 #开启 HA, 使用文件系统作为快照存储 state.backend: filesystem #默认为 none, 用于指定 checkpoint 的 data files 和 meta data 存储的目录 state.checkpoints.dir: hdfs://node01:8020/flink-checkpoints #默认为 none, 用于指定 savepoints 的默认目录 state.savepoints.dir: hdfs://node01:8020/flink-checkpoints #使用 zookeeper 搭建高可用 high-availability: zookeeper # 存储 JobManager 的元数据到 HDFS,用来恢复 JobManager 所需的所有元数据 high-availability.storageDir: hdfs://node01:8020/flink/ha/ high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181 将配置过的 HA 的 flink-conf.yaml 分发到另外两个节点 for i in {2..3}; do scp -r /export/servers/flink-1.7.2/conf/flink-conf.yaml node0$i:$PWD; done 到节点 2 中修改 flink-conf.yaml 中的配置, 将 JobManager 设置为自己节点的 名称 jobmanager.rpc.address: node02 在 masters 配置文件中添加多个节点 node01:8081 node02:8081 分发 masters 配置文件到另外两个节点 for i in {2..3}; do scp -r flink-1.7.2/ node0$i:$PWD; done 启动 zookeeper 集群 [root@node01 servers]# /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start [root@node02 servers]# /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start [root@node03 servers]# /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start 启动 flink 集群 [root@node01 flink-1.7.2]# bin/start-cluster.sh Starting HA cluster with 2 masters. Starting standalonesession daemon on host node01.hadoop.com. Starting standalonesession daemon on host node02.hadoop.com. Starting taskexecutor daemon on host node01.hadoop.com. Starting taskexecutor daemon on host node02.hadoop.com. Starting taskexecutor daemon on host node03.hadoop.com 分别查看两个节点的 Flink Web UIkill 掉一个节点, 查看另外的一个节点的 Web UI 注意事项切记搭建 HA, 需要将第二个节点的 jobmanager.rpc.address 修改为 node02 三、yarn 集群环境在一个企业中, 为了最大化的利用集群资源, 一般都会在一个集群中同时运行多种类 型的 Workload。 因此 Flink 也支持在 Yarn 上面运行; flink on yarn 的前提是: hdfs、 yarn 均启动 3.1 准备工作 jdk1.8 及以上【 配置 JAVA_HOME 环境变量】ssh 免密码登录【 集群内节点之间免密登录】至少 hadoop2.2hdfs & yarn 3.2集群规划node01(master) node02(slave) node03(slave) 3.3修改 hadoop 的配置参数 vim etc/hadoop/yarn-site.xml yarn.nodemanager.vmem-check-enabled false是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则 直接将其杀 掉,默认是 true。 在这里面我们需要关闭,因为对于 flink 使用 yarn 模式下,很容易内存超标,这个时候 yarn 会自动杀掉 job。 3.4修改全局变量/etc/profile.d/flink.sh #添加: export HADOOP_CONF_DIR=/export/servers/hadoop/etc/Hadoop #YARN_CONF_DIR 或者 HADOOP_CONF_DIR 必须将环境变量设置为读取 YARN 和 HDFS 配置 3.5 Flink on Yarn 的运行机制
![]() ![]() yarn-session.sh(开辟资源)+flink run(提交任务) 这种模式下会启动 yarn session,并且会启动 Flink 的两个必要服务: JobManager 和 Task-managers,然后你可以向集群提交作业。同一个 Session 中可以提交多个 Flink 作业。需要注意的是,这种模式下 Hadoop 的版本至少 是 2.2,而且必须安装了 HDFS(因为启动 YARN session 的时候会向 HDFS 上 提交相关的 jar 文件和配置文件) 通过./bin/yarn-session.sh 脚本启动 YARN Session -n,--container 分配多少个 yarn 容器 (=taskmanager 的数量) Optional -D 动态属性 -d,--detached 独立运行 (以分离模式运行作业) -id,--applicationId YARN 集群上的任务 id,附着到一个后台运行的 yarn session 中 -j,--jar Path to Flink jar file -jm,--jobManagerMemory JobManager 的内存 [in MB] -m,--jobmanager 指定需要连接的 jobmanager(主节点)地址 ,使用这个参数可以指定一 个不同于配置文件中的 jobmanager -n,--container 分配多少个 yarn 容器 (=taskmanager 的数量) -nm,--name 在 YARN 上为一个自定义的应用设置一个名字 -q,--query 显示 yarn 中可用的资源 (内存, cpu 核数) -qu,--queue 指定 YARN 队列 -s,--slots 每个 TaskManager 使用的 slots 数量 -st,--streaming 在流模式下启动 Flink -tm,--taskManagerMemory 每个 TaskManager 的内存 [in MB] -z,--zookeeperNamespace 针对 HA 模式在 zookeeper 上创建 NameSpace注意: 如果不想让 Flink YARN 客户端始终运行,那么也可以启动分离的 YARN 会话。 该参数被称为 -d 或–detached。 启动: bin/yarn-session.sh -n 2 -tm 800 -s 1 -d上面的命令的意思是,同时向 Yarn 申请 3 个container(即便只申请了两个,因为ApplicationMaster和JobManager有一个额外的容器。一旦将Flink部署到YARN群集中,它就会显示JobManager的连接详细信息),其中2个Container启动TaskManager(-n2),每个TaskManager拥有1个TaskSlot(-s1),并且向每个TaskManager的Container申请800M的内存,以及一个ApplicationMaster(JobManager)。启动成功之后,去yarn页面:ip:8088 可以查看当前提交的 flink session。 ![]() ![]() ![]() 上面的 YARN session 是在 Hadoop YARN 环境下启动一个 Flink cluster 集群,里面的资源 是可以共享给其他的 Flink 作业。我们还可以在 YARN 上启 动一个 Flink 作业,这里我们还是使 用./bin/flink,但是不需要事先启动 YARN session. 使用 flink 直接提交任务 bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar以上命令在参数前加上 y 前缀,-yn 表示 TaskManager 个数 在 8088 页面观察: 注意 : 如果使用的 是 flink on yarn 方式,想切换回 standalone 模式的话, 需要删除文件: 【/tmp/.yarn-properties-root】 因为默认查找当前 yarn 集群中已有的 yarn-session 信息中的 jobmanager |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |