spark中的RDD以及常用算子

您所在的位置:网站首页 spark常用算子有哪些 spark中的RDD以及常用算子

spark中的RDD以及常用算子

2024-05-26 01:20| 来源: 网络整理| 查看: 265

文章目录 1.RDD基本概念2.RDD的三种创建方式1. sc.parallelize 由一个已经存在的集合创建2. 由外部存储文件创建3. 由已有的RDD经过算子转换,生成新的RDD 3.RDD转换算子mapfilterflatMapmapPartitionssampleuniondistinctpartitionByreduceByKeygroupByKeyjoincoalescerepartitionmapValues 4.RDD行动算子reducecollectcountfirsttaketakeSamplesaveAsTextFilesaveAsSequenceFileforeachforeachPartition

在这里插入图片描述

1.RDD基本概念

RDD叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

基于rdd的数据分析就是对rdd进行一系列的转换

不可变的,易于解决并发问题,每一次转换返回的都是新的rdd

spark的数据在RDD里,通过一系列的转换算子达到数据分析的目的

hive的etl是对表进行数据分析,spark是对RDD进行etl

mr是IO密集型,海量数据一次性处理,对于迭代运算效率不高,中间的结果要落地磁盘,所以IO太高,mr压根就没有交互式的功能

RDD的两种操作:转换操作和行动操作spark采用惰性计算 惰性计算的好处 整个式子全部列出来,谁先谁后可以优化:底层转换为DAG有向无环图, 这也是比mr快的原因,良好的规划了计算任务 spark比mr快的原因:计算基于内存、惰性计算有DAG有向无环图

解决数据倾斜:

1.增加并行度,就是分区数多 2.先加一个随机前缀 ,聚合完了再去掉 3.改分区策略 不用hashpartition

协同分区

协同分区:简单理解:分区数相同 分区器相同,没有shuffle 非协同分区 需要shuffer 2.RDD的三种创建方式 1. sc.parallelize 由一个已经存在的集合创建 val rdd1 = sc.parallelize(Array(1, 2, 3)) // 指定分区数为2 val rdd1 = sc.parallelize(Array(1, 2, 3), 2) val rdd1 = sc.makeRDD(arr, 2) // 查看分区数 rdd1.getNumPartitions // 若没有指定分区,默认就是启动的时候CPU的核数 2. 由外部存储文件创建

包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。

val rdd1 = sc.textFile("file:///h:/words.txt") 3. 由已有的RDD经过算子转换,生成新的RDD val rdd3=rdd2.flatMap(_.split(" ")) 3.RDD转换算子 map

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

val rdd = sc.makeRDD(Array(1, 2, 3)) val maparray = rdd.map(_ * 2).collect // Array[Int] = Array(2, 4, 6) filter

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

val rdd = sc.makeRDD(Array(1, 2, 3)) val filterarray = rdd.filter(x => x>2).collect // Array[Int] = Array(3) flatMap

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

val arr = Array("hello hive", "hello spark") var rdd = sc.makeRDD(arr, 2) val res = rdd.flatMap(x => x.split(" ")).collect // Array(hello, hive, hello, spark) mapPartitions

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

迭代器进迭代器出

val rdd = sc.parallelize(List("zs"->"male", "lisi"->"male", "lili"->"female", "xh"->"female")) val res = rdd.mapPartitions( iter => { var woman = List[String]() while(iter.hasNext){ val next = iter.next next match { case (_, "male") => woman = next._1::woman case _ => println() } } woman.iterator } ) println(res.collect.toBuffer) // ArrayBuffer(lisi, zs) sample sample(withReplacement, fraction, seed)

以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)

val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // 有放回抽样,随机抽2个,随机种子1 val res = rdd.sample(true, 2, 1).collect // takesample 结果不是rdd union

对源RDD和参数RDD求并集后返回一个新的RDD

val rdd1 = sc.parallelize(1 to 5) val rdd2 = sc.parallelize(5 to 10) val res = rdd1.union(rdd2).collect // Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10) distinct

对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

val add1 = sc.parallelize(List(1,2,1,5,2,9,6,1)) val res = add1.distinct().collect partitionBy

调优的关键

对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD.

窄依赖:数据没有发生迁移 宽依赖。父rdd进入子rdd不同的分区 窄依赖可以并行处理, 宽依赖就是串行的计算,要等前面结果全部处理完了再进行下面结果 val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd"))) // 查看分区数 rdd.partitions.size // 4 var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2.partitions.size // 2 reduceByKey reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j42PapFS-1603763477128)(D762D2E2DC4F42419B3078F1C7DBD17A)]

先局部聚合再整体聚合 减少了IO和通讯的资源

groupbykey 直接拉过来了

val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) // val res = rdd.reduceByKey((x,y) => x+y).collect 不加 ()也行 val res = rdd.reduceByKey((x,y) => x+y).collect() // Array((female,6), (male,7)) groupByKey

groupByKey也是对每个key进行操作,但只生成一个sequence。

在这里插入图片描述

val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val group = wordPairsRDD.groupByKey().collect() // Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1))) val group2 = group.map(t => (t._1, t._2.sum)) // Array((two,2), (one,1), (three,3)) join join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd.join(rdd1).collect() // Array((1,(a,4)), (2,(b,5)), (3,(c,6))) coalesce coalesce(numPartitions)

缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

val rdd = sc.parallelize(1 to 16,4) rdd.partitions.size // 4 val coalesceRDD = rdd.coalesce(3) coalesceRDD.partitions.size // 3 repartition

根据分区数,重新通过网络随机洗牌所有数据。

val rdd = sc.parallelize(1 to 16,4) rdd.partitions.size // 4 val rerdd = rdd.repartition(2) rerdd.partitions.size // 2 val rerdd = rdd.repartition(4) rerdd.partitions.size // 4 mapValues

针对于(K,V)形式的类型只对V进行操作

val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3.mapValues(_+"|||").collect() // Array((1,a|||), (1,d|||), (2,b|||), (3,c|||)) 4.RDD行动算子 reduce

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

val rdd1 = sc.makeRDD(1 to 10,2) rdd1.reduce(_+_) // 55 val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5))) rdd2.reduce((x,y)=>(x._1 + y._1, x._2 + y._2)) // (String, Int) = (adca,12) collect

在驱动程序中,以数组的形式返回数据集的所有元素

var rdd1 = sc.makeRDD(1 to 10,2) rdd1.collect() count

返回RDD的元素个数

var rdd = sc.makeRDD(1 to 10,2) rdd.count() // 10 first

返回RDD的第一个元素(类似于take(1))

var rdd = sc.makeRDD(1 to 10,2) rdd.first() // 1 take

返回一个由数据集的前n个元素组成的数组

var rdd = sc.makeRDD(1 to 10,2) rdd.take(5) // Array(1, 2, 3, 4, 5) takeSample takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

var rdd =sc.parallelize(1 to 10,2) rdd.collect() // Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) rdd.takeSample(true,5,3) // Array(3, 5, 5, 9, 7) saveAsTextFile

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

在这里插入图片描述

saveAsSequenceFile

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

在这里插入图片描述

foreach

foreach(func) foreachPartition() 能够将数据保存到外部设备 Mysql Redis等

var rdd = sc.makeRDD(1 to 10,2) var sum = sc.accumulator(0) rdd.foreach(sum+=_) sum.value // 55 rdd.collect().foreach(println) // 1 2 3 4 5 6 7 8 9 10 foreachPartition


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3