Spark(RDD)转换操作

您所在的位置:网站首页 missing怎样读 Spark(RDD)转换操作

Spark(RDD)转换操作

2024-04-18 12:06| 来源: 网络整理| 查看: 265

aggregateByKey函数

功能:根据设定规则同时进行分区间的计算和分区内的计算,具体为,(1)在分区内按照相同的key进行某种计算,分区内部计算完后,接着计算分区间的(2)同样依据相同的key按照规则进行计算。

注意:数据类型为键值对类型,aggregateByKey函数有两个参数列表,即:aggregateByKey(参数)(参数1,参数2),其中:

参数:在进行分区内计算时,依据分区内的计算规则,设置一个初始值,进行第一步的计算。参数1:分区内的计算规则参数2:分区间的计算规则

实例1

import org.apache.spark.{SparkConf, SparkContext} object aggregateByKey { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(("a", 1), ("a", 2),("a", 4),("a", 6)),2) rdd.aggregateByKey(0)( (x,y) => math.max(x,y), (x,y) => x + y ).collect().foreach(println) sc.stop() } } 结果: (a,8)

计算步骤:

分区1为:("a", 1), ("a", 2),分区2为:("a", 4),("a", 6)

分区内计算:相同key的a的最大值为2,相同的key的b的最大值为6,分区间计算:相同key为a,进行区间的计算,返回结果2+6=8。

实例2

import org.apache.spark.{SparkConf, SparkContext} object aggregateByKey { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(("a", 1), ("a", 2),("b", 4),("b", 6)),2) rdd.aggregateByKey(0)( (x,y) => math.max(x,y), (x,y) => x + y ).collect().foreach(println) sc.stop() } } 结果: (b,6) (a,2)

计算步骤:

分区1为:("a", 1), ("a", 2),分区2为:("b", 4),("b", 6)

分区内计算:相同key的a的最大值为2,相同的key的b的最大值为6,分区间计算:由于没有相同的key,则不进行计算,返回结果(b,6) (a,2)。

特殊情况:

分区内和分区间的计算规则一样时,其一般使用foldByKey函数,aggregateByKey函数和foldByKey函数有联系。

import org.apache.spark.{SparkConf, SparkContext} object aggregateByKey_3 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(("a", 1), ("a", 2),("a", 4),("a", 6)),2) rdd.aggregateByKey(0)( (x,y) => x + y, (x,y) => x + y ).collect().foreach(println) //上面的简写 rdd.aggregateByKey(0)(_+_,_+_).collect().foreach(println) sc.stop() } } 结果: (a,13) (a,13)进阶:

aggregateByKey最终的返回数据结果应该和初始值的类型保持一致

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark18_RDD_Operator_Transform3 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) // //val aggRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _) //aggRDD.collect.foreach(println) // 获取相同key的数据的平均值 => (a, 3),(b, 4) val newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )( ( t, v ) => { (t._1 + v, t._2 + 1) }, (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2) } ) val resultRDD: RDD[(String, Int)] = newRDD.mapValues { case (num, cnt) => { num / cnt } } resultRDD.collect().foreach(println) sc.stop() } }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3