spark广播变量的原理

您所在的位置:网站首页 spark广播机制 spark广播变量的原理

spark广播变量的原理

2023-07-18 17:35| 来源: 网络整理| 查看: 265

e213f116ebe2cb2c4c773686b58a9b85.png spark 中的累加器(accumulator) 和广播变量(broadcast variable) 都是共享变量(所谓共享,就是在驱动器程序和工作节点之间共享) 累加器用于对信息进行聚合广播变量用于高效的分发较大的对象 一、累加器 在集群中执行代码时,一个难点是:理解变量和方法的范围、生命周期。下面是一个闭包的例子:counter = 0rdd = sc.parallelize(data)​def increment_counter(x): global counter counter += xrdd.foreach(increment_counter)print("Counter value: ", counter)上述代码的行为是不确定的,并且无法按照预期正常工作。在执行作业时,spark 会分解RDD 操作到每个executor 的task 中。在执行之前,spark 计算任务的闭包 所谓闭包:指的是executor 要在RDD 上进行计算时,必须对执行节点可见的那些变量和方法闭包被序列化,并被发送到每个executor

3.在上述代码中,闭包的变量的副本被发送给每个executor,当counter 被foreach 函数引用时,它已经不再是驱动器节点的counter 了

虽然驱动器程序中,仍然有一个counter 在内存中;但是对于executors ,它是不可见的。executor 看到的只是序列化的闭包的一个副本。所有对counter 的操作都是在executor 的本地进行。要想正确实现预期目标,则需要使用累加器

4.累加器相当于集群的统筹变量

1.1 Accumulator 一个累加器(Accumulator)变量只支持累加操作 工作节点和驱动器程序对它都可以执行+= 操作,但是只有驱动器程序可以访问它的值。在工作节点上,累加器对象看起来就像是一个只写的变量工作节点对它执行的任何累加,都将自动的传播到驱动器程序中。 SparkContext 的累加器变量只支持基本的数据类型,如int、float 等。 你可以通过AccumulatorParam 来实现自定义的累加器 Accumulator 的方法: .add(term):向累加器中增加值term Accumulator 的属性: .value:获取累加器的值。只可以在驱动器程序中使用 通常使用累加器的流程为: 在驱动器程序中调用SparkContext.accumulator(init_value) 来创建出带有初始值的累加器在执行器的代码中使用累加器的+= 方法或者.add(term) 方法来增加累加器的值在驱动器程序中使用累加器的.value 属性来访问累加器的值

示例:file=sc.textFile('xxx.txt')acc=sc.accumulator(0)def xxx(line): global acc #访问全局变量 if yyy: acc+=1 return zzzrdd=file.map(xxx)

1.2 累加器与容错性 spark 中同一个任务可能被运行多次: 如果工作节点失败了,则spark 会在另一个节点上重新运行该任务如果工作节点处理速度比别的节点慢很多,则spark 也会抢占式的在另一个节点上启动一个投机性的任务副本甚至有时候spark 需要重新运行任务来获取缓存中被移出内存的数据 当spark 同一个任务被运行多次时,任务中的累加器的处理规则: 在行动操作中使用的累加器,spark 确保每个任务对各累加器修改应用一次 因此:如果想要一个无论在失败还是重新计算时,都绝对可靠的累加器,我们必须将它放在foreach() 这样的行动操作中在转化操作中使用的累加器,无法保证只修改应用一次。 转化操作中累加器可能发生不止一次更新在转化操作中,累加器通常只用于调试目的 二、广播变量 广播变量可以让程序高效的向所有工作节点发送一个较大的只读值spark 会自动的把闭包中所有引用到的变量都发送到工作节点上。虽然这很方便,但是也很低效。原因有二: 默认的任务发射机制是专门为小任务进行优化的事实上,你很可能在多个并行操作中使用同一个变量。但是spark 会为每个操作分别发送。

注意:1不能将RDD广播出去,可以将RDD的结果广播出去

2.广播变量只能在Driver端定义,在Executor端使用,Executor端不能

2.1 Broadcast Broadcast 变量的value 中存放着广播的值,该值只会被发送到各节点一次Broadcast 的方法: .destroy():销毁当前Broadcast 变量的所有数据和所有metadata。 注意:一旦一个Boradcast 变量被销毁,那么它就再也不能被使用该方法将阻塞直到销毁完成.dump(value,f):保存Broadcast 变量.load(path):加载Broadcast 变量.unpersist(blocking=False):删除Broadcast 变量在executor 上的缓存备份。 如果在此之后,该Broadcast 被使用,则需要从驱动器程序重新发送Broadcast 变量到executor参数: blocking:如果为True,则阻塞直到unpersist 完成 属性: .value:返回Broadcast 变量的值 使用Broadcast 的流程: 通过SparkContext.broadcast(xx) 创建一个Broadcast 变量通过.value 属性访问该对象的值该变量只会被发送到各节点一次,应该作为只读值来处理(修改这个值并不会影响到其他的节点)

示例:bd=sc.broadcast(tuple('name','json'))def xxx(row): s=bd.value[0]+row return srdd=rdd.map(xxx)

2.2 广播的优化 当广播一个较大的值时,选择既快又好的序列化格式非常重要 如果序列化对象的时间较长,或者传送花费的时间太久,则这个时间很容易成为性能瓶颈

2.spark 中的Java API 和 Scala API 默认使用的序列化库为Java 序列化库,它对于除了基本类型的数组以外的任何对象都比较低效。

你可以使用spark.serializer 属性来选择另一个序列化库来优化序列化过程


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3