并行执行

您所在的位置:网站首页 flink连接多个数据源 并行执行

并行执行

2023-05-20 10:51| 来源: 网络整理| 查看: 265

并行执行 #

本节描述了在 Flink 中配置程序的并行执行。一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

toc 设置并行度 #

一个 task 的并行度可以从多个层次指定:

算子层次 #

单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = [...]; DataStream wordCounts = text .flatMap(new LineSplitter()) .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example"); val env = StreamExecutionEnvironment.getExecutionEnvironment val text = [...] val wordCounts = text .flatMap{ _.split(" ") map { (_, 1) } } .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1).setParallelism(5) wordCounts.print() env.execute("Word Count Example") env = StreamExecutionEnvironment.get_execution_environment() text = [...] word_counts = text .flat_map(lambda x: x.split(" ")) \ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \ .key_by(lambda i: i[0]) \ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \ .reduce(lambda i, j: (i[0], i[1] + j[1])) \ .set_parallelism(5) word_counts.print() env.execute("Word Count Example") 执行环境层次 #

如此节所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream text = [...]; DataStream wordCounts = [...]; wordCounts.print(); env.execute("Word Count Example"); val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) val text = [...] val wordCounts = text .flatMap{ _.split(" ") map { (_, 1) } } .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1) wordCounts.print() env.execute("Word Count Example") env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(3) text = [...] word_counts = text .flat_map(lambda x: x.split(" ")) \ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \ .key_by(lambda i: i[0]) \ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \ .reduce(lambda i, j: (i[0], i[1] + j[1])) word_counts.print() env.execute("Word Count Example") 客户端层次 #

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

try { PackagedProgram program = new PackagedProgram(file, args); InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123"); Configuration config = new Configuration(); Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader()); // set the parallelism to 10 here client.run(program, 10, true); } catch (ProgramInvocationException e) { e.printStackTrace(); } try { PackagedProgram program = new PackagedProgram(file, args) InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123") Configuration config = new Configuration() Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader()) // set the parallelism to 10 here client.run(program, 10, true) } catch { case e: Exception => e.printStackTrace } Python API 中尚不支持该特性。 系统层次 #

可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。你可以通过查阅配置文档获取更多细节。

设置最大并行度 #

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768。

为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。

Back to top



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3