spark 广播变量和累加器使用和原理

您所在的位置:网站首页 list累加 spark 广播变量和累加器使用和原理

spark 广播变量和累加器使用和原理

#spark 广播变量和累加器使用和原理| 来源: 网络整理| 查看: 265

使用

通常,当传递给Spark算子(比如map或reduce)函数在远程集群节点上执行时,它在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台服务器上,对远程服务器上变量的任何更新都不会传播回driver程序。通常支持跨Tasks的读写共享变量性能比较低。也就是说如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

然而,Spark确实为两种常见的使用模型提供了两种有限类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。

1. 广播变量

使用场景

假如在spark程序里需要用到大对象,比如:字典,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗Executor服务器上的资源,如果将这个变量声明为广播变量,那么只是每个Executor拥有一份,这个Executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

图示

spark 广播变量和累加器使用和原理

使用方式

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.valueres0: Array[Int] = Array(1, 2, 3)

注意事项

1)变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改2)不能将一个RDD使用广播变量广播出去,因为RDD是不存储数据的。可以将RDD的结果广播出去。3)广播变量只能在Driver端定义,不能在Executor端定义。4)在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。5)如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。6)如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本

实现原理

代码实现详见:

org.apache.spark.broadcast.TorrentBroadcast

这里大体讲述下broadcast variable的读写概述:

1) driver端执行sparkContext.broadcast(value),会将存储对象进行压缩、序列化,然后默认以4m大小分成多个block,并将这些block存储到driver端的blockManager中,存储方式为MEMORY_AND_DISK_SER,并返回TorrentBroadcast对象。2) 在每个executor上,executor使用到broadcast对象时,会尝试从当前executor的blockManager中获取数据,若不存在,则远程从driver或其他executor(如果可用)中获取对象block,一旦获取到block,它就会将block存放到broadcastCache中,为其他executor来获取数据做好准备。3) 通过这种方式,可以防止driver成为(向每个executor)发送广播副本的瓶颈。如果driver向每个executor都要发送广播副本则会导致driver网络带宽成为瓶颈,效率也会比较低

该文章也讲解的不错:

https://zhuanlan.zhihu.com/p/158894710

2. 累加器

使用场景

常用作计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会在driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

图示

spark 广播变量和累加器使用和原理

使用方式

scala> val accum = sc.longAccumulator(“My Accumulator”)accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))…

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.valueres2: Long = 10

注意事项

1)累加器只能在Driver端定义和初始化,在Executor端更新,不能在Executor端定义,不能在Executor端(.value)获取值

实现原理

代码详见:

org.apache.spark.util.AccumulatorV2

这里大体讲述下实现原理:

1) sc.longAccumulator(“xxx”)方法会初始化AccumulatorMetadata,包括唯一id和name,并在AccumulatorContext进行accumulator的注册(注册的目的是task结束后,accumulator合并的时候可以找得到)。2) executor端的task执行到相关的rdd方法的时候,会触发闭包的传输,序列化机制中会将是否在driver端的标识改为false,并对当前的accumulator执行copyAndReset(),保证task收到的是一个干净的accumulator,并在taskContext中注册该accumulator。task就可以执行add()方法进行累加操作了。3) driver端的DagScheduler收到了task完成的事件后,会把收到的事件对象中的各个累加器合并到AccumulatorContext中注册好的主累加器中,这样用户在driver端调用longAccumulator.value()的时候,拿到的就是已经处理好后的累加值。

Original: https://blog.csdn.net/chanyue123/article/details/123175776Author: sf_wwwTitle: spark 广播变量和累加器使用和原理

相关阅读 Title: Java基础——Collections工具类

参考操作数组的工具类:Arrays。

当Collections工具类的参数列表是Collection时,表示参数可以是set或者List,如果明确表明了参数列表是List或者set则不能传入另外一种参数。

Collections 是一个操作 Set、List 和 Map 等集合的工具类。Collections 中提供了一系列静态的方法对集合元素进行排序、查询和修改等操作,还提供了对集合对象设置不可变、对集合对象实现同步控制等方法:

public static  Listlist=new ArrayList(); list.add("张三"); Collections.addAll(list,"李四","王五"); System.out.println(list); public static int i = Collections.binarySearch(list, "张三"); System.out.println(i); public static    Collections.binarySearch(list, "王五", new Comparator() {  public static System.out.println(Collections.max(list)); public static  Collections.max(list, new Comparator() { 

public static

System.out.println(Collections.min(list));

public static

 Collections.min(list, new Comparator() { 

* public static void reverse(List list)反转指定列表List中元素的顺序。*

Collections.reverse(list); System.out.println(list); public static void shuffle(List list) List 集合元素进行随机排序,类似洗牌 Collections.shuffle(list); System.out.println(list); public static Collections.sort(list); public static  Collections.sort(list, new Comparator() {  public static void swap(List list,int i,int j)将指定 list 集合中的 i 处元素和 j 处元素进行交换 Collections.swap(list,0,2); System.out.println(list); public static int frequency(Collection c,Object o)返回指定集合中指定元素的出现次数 System.out.println(list); list.add("王五"); System.out.println(Collections.frequency(list, "王五")); public static 注意,1、新集合dest长度必须大于等于所要复制的src集合长度,不然会报Source does not fit in dest 2、如果新集合dest中有元素,则src中的元素会替换掉它们,如果dest长度必须大于复制的src集合长度时,只会替换对应下标的元素 List list1=new ArrayList(); Collections.addAll(list1,"1","2","3","4"); System.out.println(list1); Collections.copy(list1,list); System.out.println(list1); public static 用参数列表的第二个参数替换掉集合中所有的第一个参数 Collections.replaceAll(list1,"王五","张七"); System.out.println(list1); Collections 类中提供了多个 synchronizedXxx() 方法,该方法可使将指定集合包装成线程同步的集合,从而可以解决多线程并发访问集合时的线程安全问题 Collections类中提供了多个unmodifiableXxx()方法,该方法返回指定 Xxx的不可修改的视图。

Original: https://www.cnblogs.com/CYan521/p/16435517.htmlAuthor: 再美不及姑娘你Title: Java基础——Collections工具类

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/394872/

转载文章受原作者版权保护。转载请注明原作者出处!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3