spark RDD编程 第3关:求平均值

您所在的位置:网站首页 求平均成绩的公式word spark RDD编程 第3关:求平均值

spark RDD编程 第3关:求平均值

2023-12-02 05:33| 来源: 网络整理| 查看: 265

任务描述

本关任务:编写Spark独立应用程序实现求平均值。 相关知识

为了完成本关任务,你需要掌握:RDD的创建;RDD的转换操作;RDD的行动操作。 RDD的创建

使用textFile()方法从本地文件系统中加载数据创建RDD,示例如下: val lines = sc.textFile("file:///home/hadoop/word.txt") 执行sc.textFile()方法以后,Spark从本地文件word.txt中加载数据到内存,在内存中生成一个RDD对象lines,这个RDD里面包含了若干个元素,每个元素的类型是String类型,也就是说,从word.txt文件中读取出来的每一行文本内容,都成为RDD中的一个元素。 使用map()函数转换得到相应的键值对RDD,示例如下: val lines = sc.textFile("file:///home/hadoop/word.txt") val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1)) 上面示例中,map(word=>(word,1))函数的作用是取出RDD中的每个元素,也就是每个单词,赋值给word,然后把word转换成(word,1)的键值对形式。 RDD的转换操作

对于RDD而言,每一次转换操作都会产生新的RDD,供给下一个操作使用。RDD的转换过程是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发真正的计算。常见的RDD转换操作如下所示:

**filter(func)**:筛选出满足函数func的元素,并返回一个新的RDD,示例如下: val lines = sc.textFile("file:///home/hadoop/word.txt") val linesWithSpark = lines.filter(line => line.contains("Spark")) **map(func)**:将每个元素传递到函数func中,并将结果返回为一个新的RDD,示例如下: val lines = sc.textFile("file:///home/hadoop/word.txt") val words = lines.map(line => line.split(" ")) **reduceByKey(func)**:应用于(K,V)键值对RDD时,返回一个新的(K, V)形式的RDD,其中每个值是将每个key传递到函数func中进行聚合后的结果,示例如下: val lines = sc.textFile("file:///home/hadoop/word.txt") val words = lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) **mapValues(func)**:对键值对RDD中的每个value都应用一个函数,但是key不会发生变化,示例如下: val pairRDD = sc.parallelize(Array("Hadoop",3),("Spark",5),("Hive",2)) pairRDD.mapValues(x => x+1).foreach(println) 输出: (Hadoop,4) (Spark,6) (Hive,3) RDD的行动操作

对于RDD而言,只有遇到行动操作时才会执行“从头到尾”的真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。常见的RDD行动操作如下所示:

**count()**:返回RDD中元素的个数 **collect()**:以数组的形式返回RDD中的所有元素 **first()**:返回RDD中的第一个元素 **take(n)**:以数组的形式返回RDD中的前n个元素 **reduce(func)**:通过函数func(输入两个参数并返回一个值)聚合RDD中的元素 **foreach(func)**:将RDD中的每个元素传递到函数func中运行 下面通过一个示例来介绍上述行动操作,如下所示: val rdd = sc.parallelize(Array(1,2,3,4,5)) println(rdd.count) println(rdd.first) println(rdd.take(3).mkString(", ")) println(rdd.collect().mkString("-> ")) rdd.foreach(println) 输出: 5 1 1, 2, 3 1-> 2-> 3-> 4-> 5 1 2 3 4 5 计算图书平均销量的示例

给定一组键值对(“Spark”,2)、(“Hadoop”,6)、(“Hadoop”,4)、(“Spark”,6),键值对的key表示图书名称,value表示某天图书销量,现在需要计算每个键对应的平均值,也就是计算每种图书的平均销量,如下所示:

val rdd = sc.parallelize(Array(("Spark",2),("Hadoop",6),("Hadoop",4),("Spark",6))) val rdd1 = rdd.mapValues(x => (x, 1)) val rdd2 = rdd1.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) val rdd3 = rdd2.mapValues(x => (x._1 / x._2)) rdd3.foreach(println)

输出: (Spark,4) (Hadoop,5) 计算图书平均销量过程示意图 ####编程要求 每个输入文件表示学生某门课程的成绩,输入文件中每行内容由两个字段组成,第一个字段是学生姓名,第二个字段是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。 输入文件(AlgorithmScore)的样例如下: XiaoMing 92 XiaoHong 87 XiaoXin 82 XiaoLi 90 输入文件(DataBaseScore)的样例如下: XiaoMing 95 XiaoHong 81 XiaoXin 89 XiaoLi 85 输入文件(PythonScore)的样例如下: XiaoMing 82 XiaoHong 83 XiaoXin 94 XiaoLi 91 输出文件的样例如下: XiaoMing 89.67 XiaoXin 88.33 XiaoHong 83.67 XiaoLi 88.67

测试说明

本实训目前是基于Spark单机模式的运行方式,完成整个评测流程所需时间较长(全过程耗时约60秒),请耐心等待!

开始你的任务吧,祝你成功!

参考代码 import org.apache.spark.SparkContext import org.apache.spark.SparkConf object AvgScore { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FileSort").setMaster("local") val sc = new SparkContext(conf) //输入文件AlgorithmScore.txt、DataBaseScore.txt和PythonScore.txt已保存在本地文件系统/root/step3_files目录中 val dataFile = "file:///root/step3_files" val data = sc.textFile(dataFile) /********** Begin **********/ //第一步:执行过滤操作,把空行丢弃。 val rdd1 = data.filter(_.trim().length > 0) //第二步:执行map操作,取出RDD中每个元素(即一行文本),以空格作为分隔符将一行文本拆分成两个字符串, //拆分后得到的字符串封装在一个数组对象中,成为新的RDD中一个元素。 var rdd2 = rdd1.map(line => line.split(" ")) //第三步:执行map操作,取出RDD中每个元素(即字符串数组),取字符串数组中第一个元素去除尾部空格, //取字符串数组中第二个元素去除尾部空格并转换成整数,并由这两部分构建一个(key, value)键值对。 val rdd3 = rdd2.map(t => (t(0).trim, t(1).trim.toInt)) //第四步:执行mapValues操作,取出键值对RDD中每个元素的value,使用x=>(x,1)这个匿名函数进行转换。 val rdd4 = rdd3.mapValues(x => (x, 1)) //第五步:执行reduceByKey操作,计算出每个学生所有课程的总分数和总课程门数。 val rdd5 = rdd4.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) //第六步:执行mapValues操作,计算出每个学生的平均成绩。 val rdd6 = rdd5.mapValues(x => (x._1.toDouble / x._2)) //第七步:执行collect操作,以数组的形式返回RDD中所有元素。 val rdd7 = rdd6.collect() //第八步:执行foreach操作,按如下格式打印出每个学生的平均成绩:姓名 成绩,其中成绩要求保留两位小数。 println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。 rdd7.foreach(t => { val x = t._2 println(t._1 + " " + f"$x%1.2f") }) /********** End **********/ } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3