spark中的RDD以及常用算子 |
您所在的位置:网站首页 › spark常用算子有哪些 › spark中的RDD以及常用算子 |
文章目录
1.RDD基本概念2.RDD的三种创建方式1. sc.parallelize 由一个已经存在的集合创建2. 由外部存储文件创建3. 由已有的RDD经过算子转换,生成新的RDD
3.RDD转换算子mapfilterflatMapmapPartitionssampleuniondistinctpartitionByreduceByKeygroupByKeyjoincoalescerepartitionmapValues
4.RDD行动算子reducecollectcountfirsttaketakeSamplesaveAsTextFilesaveAsSequenceFileforeachforeachPartition
RDD叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。 基于rdd的数据分析就是对rdd进行一系列的转换 不可变的,易于解决并发问题,每一次转换返回的都是新的rdd spark的数据在RDD里,通过一系列的转换算子达到数据分析的目的 hive的etl是对表进行数据分析,spark是对RDD进行etl mr是IO密集型,海量数据一次性处理,对于迭代运算效率不高,中间的结果要落地磁盘,所以IO太高,mr压根就没有交互式的功能 RDD的两种操作:转换操作和行动操作spark采用惰性计算 惰性计算的好处 整个式子全部列出来,谁先谁后可以优化:底层转换为DAG有向无环图, 这也是比mr快的原因,良好的规划了计算任务 spark比mr快的原因:计算基于内存、惰性计算有DAG有向无环图解决数据倾斜: 1.增加并行度,就是分区数多 2.先加一个随机前缀 ,聚合完了再去掉 3.改分区策略 不用hashpartition协同分区 协同分区:简单理解:分区数相同 分区器相同,没有shuffle 非协同分区 需要shuffer 2.RDD的三种创建方式 1. sc.parallelize 由一个已经存在的集合创建 val rdd1 = sc.parallelize(Array(1, 2, 3)) // 指定分区数为2 val rdd1 = sc.parallelize(Array(1, 2, 3), 2) val rdd1 = sc.makeRDD(arr, 2) // 查看分区数 rdd1.getNumPartitions // 若没有指定分区,默认就是启动的时候CPU的核数 2. 由外部存储文件创建包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。 val rdd1 = sc.textFile("file:///h:/words.txt") 3. 由已有的RDD经过算子转换,生成新的RDD val rdd3=rdd2.flatMap(_.split(" ")) 3.RDD转换算子 map返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 val rdd = sc.makeRDD(Array(1, 2, 3)) val maparray = rdd.map(_ * 2).collect // Array[Int] = Array(2, 4, 6) filter返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 val rdd = sc.makeRDD(Array(1, 2, 3)) val filterarray = rdd.filter(x => x>2).collect // Array[Int] = Array(3) flatMap类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) val arr = Array("hello hive", "hello spark") var rdd = sc.makeRDD(arr, 2) val res = rdd.flatMap(x => x.split(" ")).collect // Array(hello, hive, hello, spark) mapPartitions类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区 迭代器进迭代器出 val rdd = sc.parallelize(List("zs"->"male", "lisi"->"male", "lili"->"female", "xh"->"female")) val res = rdd.mapPartitions( iter => { var woman = List[String]() while(iter.hasNext){ val next = iter.next next match { case (_, "male") => woman = next._1::woman case _ => println() } } woman.iterator } ) println(res.collect.toBuffer) // ArrayBuffer(lisi, zs) sample sample(withReplacement, fraction, seed)以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值) val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // 有放回抽样,随机抽2个,随机种子1 val res = rdd.sample(true, 2, 1).collect // takesample 结果不是rdd union对源RDD和参数RDD求并集后返回一个新的RDD val rdd1 = sc.parallelize(1 to 5) val rdd2 = sc.parallelize(5 to 10) val res = rdd1.union(rdd2).collect // Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10) distinct对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。 val add1 = sc.parallelize(List(1,2,1,5,2,9,6,1)) val res = add1.distinct().collect partitionBy调优的关键 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD. 窄依赖:数据没有发生迁移 宽依赖。父rdd进入子rdd不同的分区 窄依赖可以并行处理, 宽依赖就是串行的计算,要等前面结果全部处理完了再进行下面结果 val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd"))) // 查看分区数 rdd.partitions.size // 4 var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2.partitions.size // 2 reduceByKey reduceByKey(func, [numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j42PapFS-1603763477128)(D762D2E2DC4F42419B3078F1C7DBD17A)] 先局部聚合再整体聚合 减少了IO和通讯的资源 groupbykey 直接拉过来了 val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) // val res = rdd.reduceByKey((x,y) => x+y).collect 不加 ()也行 val res = rdd.reduceByKey((x,y) => x+y).collect() // Array((female,6), (male,7)) groupByKeygroupByKey也是对每个key进行操作,但只生成一个sequence。 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd.join(rdd1).collect() // Array((1,(a,4)), (2,(b,5)), (3,(c,6))) coalesce coalesce(numPartitions)缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。 val rdd = sc.parallelize(1 to 16,4) rdd.partitions.size // 4 val coalesceRDD = rdd.coalesce(3) coalesceRDD.partitions.size // 3 repartition根据分区数,重新通过网络随机洗牌所有数据。 val rdd = sc.parallelize(1 to 16,4) rdd.partitions.size // 4 val rerdd = rdd.repartition(2) rerdd.partitions.size // 2 val rerdd = rdd.repartition(4) rerdd.partitions.size // 4 mapValues针对于(K,V)形式的类型只对V进行操作 val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3.mapValues(_+"|||").collect() // Array((1,a|||), (1,d|||), (2,b|||), (3,c|||)) 4.RDD行动算子 reduce通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 val rdd1 = sc.makeRDD(1 to 10,2) rdd1.reduce(_+_) // 55 val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5))) rdd2.reduce((x,y)=>(x._1 + y._1, x._2 + y._2)) // (String, Int) = (adca,12) collect在驱动程序中,以数组的形式返回数据集的所有元素 var rdd1 = sc.makeRDD(1 to 10,2) rdd1.collect() count返回RDD的元素个数 var rdd = sc.makeRDD(1 to 10,2) rdd.count() // 10 first返回RDD的第一个元素(类似于take(1)) var rdd = sc.makeRDD(1 to 10,2) rdd.first() // 1 take返回一个由数据集的前n个元素组成的数组 var rdd = sc.makeRDD(1 to 10,2) rdd.take(5) // Array(1, 2, 3, 4, 5) takeSample takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 var rdd =sc.parallelize(1 to 10,2) rdd.collect() // Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) rdd.takeSample(true,5,3) // Array(3, 5, 5, 9, 7) saveAsTextFile将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 foreach(func) foreachPartition() 能够将数据保存到外部设备 Mysql Redis等 var rdd = sc.makeRDD(1 to 10,2) var sum = sc.accumulator(0) rdd.foreach(sum+=_) sum.value // 55 rdd.collect().foreach(println) // 1 2 3 4 5 6 7 8 9 10 foreachPartition |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |