【SparkCore】RDD常用方法以及使用 |
您所在的位置:网站首页 › sparkdefaultparallelism如何设置 › 【SparkCore】RDD常用方法以及使用 |
目录 创建RDD 1.由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等 2.通过已有的RDD经过算子转换生成新的RDD 3.由一个已经存在的Scala集合创建 RDD的方法/算子分类 Transformation转换算子 Action动作算子 RDD分区的数据取决的因素 常用API 创建RDD 查看该RDD的分区数量 map filter flatmap sortBy 并集(union) 交集(intersection) 差集(subtract) 笛卡尔积(cartesian) join leftOuterJoin rightOuterJoin groupbykey cogroup groupBy reduce reducebykey repartition collect count distinct top take first keys values mapValues collectAsMap 创建RDD 1.由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等 val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")2.通过已有的RDD经过算子转换生成新的RDD val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt") val rdd2=rdd1.flatMap(_.split(" ")) 3.由一个已经存在的Scala集合创建 val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) //makeRDD方法底层调用了parallelize方法 val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8)) RDD的方法/算子分类 RDD的算子分为两类 Transformation:转换操作,返回一个新的RDD Action:动作操作,返回值不是RDD(无返回值或返回其他的) 注意 RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数) RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行。之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。 Transformation转换算子 转换 含义 map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U] sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活 join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD cartesian(otherDataset) 笛卡尔积 pipe(command, [envVars]) 对rdd进行管道操作 coalesce(numPartitions) 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 repartition(numPartitions) 重新给 RDD 分区 Action动作算子 动作 含义 reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 collect() 在驱动程序中,以数组的形式返回数据集的所有元素 count() 返回RDD的元素个数 first() 返回RDD的第一个元素(类似于take(1)) take(n) 返回一个由数据集的前n个元素组成的数组 takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素 saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 foreach(func) 在数据集的每一个元素上,运行函数func进行更新。 foreachPartition(func) 在数据集的每一个分区上,运行函数func 统计操作
算子 含义 count 个数 mean 均值 sum 求和 max 最大值 min 最小值 variance 方差 sampleVariance 从采样中计算方差 stdev 标准差:衡量数据的离散程度 sampleStdev 采样的标准差 stats 查看统计结果 RDD分区的数据取决的因素 RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源,但是在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数2~3倍。RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关系
1.启动的时候指定的CPU核数确定了一个参数值: spark.default.parallelism=指定的CPU核数(集群模式最小2)
2.对于Scala集合调用parallelize(集合,分区数)方法 如果没有指定分区数,就使用spark.default.parallelism, 如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
3.对于textFile(文件,分区数) 如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2) 如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数
4.对于本地文件 rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
5.对于HDFS文件 rdd的分区数 = max(hdfs文件的block数目,sc.defaultMinPartitions) 所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2 常用API 创建RDD val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))//makeRDD方法底层调用了parallelize方法val rdd2 = sc.makeRDD(List(5,6,4,7,3,8,2,9,1,10)) 查看该RDD的分区数量 //没有指定分区数,默认值是2 sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).partitions.length//指定了分区数为3 sc.parallelize(List(5,6,4,7,3,8,2,9,1,10),3).partitions.length//没有指定分区数,默认值是2 sc.textFile("hdfs://node01:8020/words.txt").partitions.length
map 对RDD中的每一个元素进行操作并返回操作的结果 val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) //collect方法表示收集,是action操作 rdd1.map(_ * 2).collectfilter 函数中返回True的被留下,返回False的被过滤掉 val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) val rdd3 = rdd2.filter(_ >= 10) rdd3.collectflatmap 对RDD中的每一个元素进行先map再压扁,最后返回操作的结果 val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))//将rdd1里面的每一个元素先切分再压平 val rdd2 = rdd1.flatMap(_.split(' ')) rdd2.collectsortBy 排序 val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))// x=>x 表示按照元素本身进行排序,True表示升序 val rdd2 = rdd1.sortBy(x=>x,true) rdd2.collect//x=>x+""表示按照x的字符串形式排序变成了字符串,结果为字典顺序 val rdd2 = rdd1.sortBy(x=>x+"",true) rdd2.collect并集(union) val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4))//union不会去重 求并集 val rdd3 = rdd1.union(rdd2) rdd3.collect//去重 rdd3.distinct.collect 交集(intersection) val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求交集 val rdd4 = rdd1.intersection(rdd2) rdd4.collect 差集(subtract) val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求差集 val rdd5 = rdd1.subtract(rdd2) rdd5.collect 笛卡尔积(cartesian) //学生 val rdd1 = sc.parallelize(List("jack", "tom"))//课程 val rdd2 = sc.parallelize(List("java", "python", "scala"))//笛卡尔积 //表示所有学生的所有选课情况 val rdd3 = rdd1.cartesian(rdd2) rdd3.collect join join(内连接)聚合具有相同key组成的value元组 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))//聚合 val rdd3 = rdd1.join(rdd2) rdd3.collectleftOuterJoin 左外连接,左边的全留下,右边的满足条件的才留下 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))//左外连接 val rdd4 = rdd1.leftOuterJoin(rdd2) rdd4.collectrightOuterJoin 右外连接,右边的全留下,左边的满足条件的才留下 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))//右外连接 val rdd5 = rdd1.rightOuterJoin(rdd2) rdd5.collectgroupbykey 对具有相同键的值进行分组 val rdd6 = sc.parallelize(Array(("tom",1), ("jerry",2), ("kitty",3), ("jerry",9), ("tom",8), ("shuke",7), ("tom",2))) val rdd7=rdd6.groupByKey rdd7.collectcogroup cogroup是先RDD内部分组,在RDD之间分组 val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) val rdd3 = rdd1.cogroup(rdd2) rdd3.collectgroupBy 根据指定的函数中的规则/key进行分组 val intRdd = sc.parallelize(List(1,2,3,4,5,6)) val result = intRdd.groupBy(x=>{if(x%2 == 0)"even" else "odd"}) result.collectreduce 聚合 val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))//reduce聚合 reduce是action val result = rdd1.reduce(_ + _)reducebykey 根据key,对相同key的元素进行操作 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))//并集 val rdd3 = rdd1.union(rdd2) rdd3.collect//按key进行聚合 val rdd4 = rdd3.reduceByKey(_ + _) rdd4.collectrepartition 改变分区数 1.repartition可以增加和减少rdd中的分区数, 2.coalesce默认减少rdd分区数,增加rdd分区数不会生效。 3.不管增加还是减少分区数原rdd分区数不变,变的是新生成的rdd的分区数 //指定3个分区 val rdd1 = sc.parallelize(1 to 10,3)//减少分区 新生成的rdd分区数为2 rdd1.repartition(2).partitions.length//查看原来的分区 原来的rdd分区数不变 rdd1.partitions.length//增加分区 rdd1.repartition(4).partitions.length//减少分区 rdd1.coalesce(2).partitions.sizecollect 在驱动程序中,以数组的形式返回数据集的所有元素 val rdd1 = sc.parallelize(List(6,1,2,3,4,5), 2) rdd1.collect countcount统计集合中元素的个数 val rdd1 = sc.parallelize(List(6,1,2,3,4,5), 2) rdd1.countdistinct 去重 val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3) rdd.distinct.collecttop 取出最大的前N个 val rdd1 = sc.parallelize(List(3,6,1,2,4,5)) rdd1.top(2)take 按照顺序取前N个 val rdd1 = sc.parallelize(List(3,6,1,2,4,5))//3 6 rdd1.take(2) //需求:取出最小的2个 rdd1.sortBy(x=>x,true).take(2)first 取第一个元素 val rdd1 = sc.parallelize(List(3,6,1,2,4,5)) rdd1.firstkeys 获取所有key val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val rdd2 = rdd1.map(x => (x.length, x)) rdd2.collectrdd2.keys.collectvalues 获取所有value val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val rdd2 = rdd1.map(x => (x.length, x)) rdd2.collectrdd2.values.collectmapValues mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后 val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30)))//_表示每一个value ,key不变,将函数作用于value val rdd2 = rdd1.mapValues(_*2).collectcollectAsMap 转换成Map val rdd = sc.parallelize(List(("a", 1), ("b", 2))) rdd.collectAsMap
|
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |