Spark基础【RDD单Value类型转换算子】

您所在的位置:网站首页 spark的算子类型 Spark基础【RDD单Value类型转换算子】

Spark基础【RDD单Value类型转换算子】

2024-07-15 06:21| 来源: 网络整理| 查看: 265

文章目录 RDD转换算子算子单Value类型1 map(1)案例:从服务器日志数据agent.log中获取第四列数据 2 mapPartitions(1)map和mapPartitions的区别(2)java克隆浅复制(3)案例:获取每个数据分区的最大值 3 mapPartitionsWithIndex4 flatMap(1)案例:将List(List(1,2),3,List(4,5))进行扁平化操作 5 glom(1)案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和) 6 groupBy(1)案例:将List("Hello","hive", "hbase", "Hadoop")根据单词首写字母进行分组。(2)案例:按照agent.log中的第二列数据分组,相同求和

RDD转换算子

RDD的方法有很多,但一般分为两大类,第一类是逻辑的封装,将旧的逻辑转换为新的逻辑,称之为转换算子;第二类是执行逻辑,将封装好的逻辑进行执行,让整个作业运行起来,称之为行动算子

算子

问题(初始)–> operator(算子,操作,方法) –> 问题(解决,完成)

RDD根据数据处理方式的不同将算子整体上分为单Value类型、双Value类型和Key-Value类型

将RDD的方法称为算子的原因是与Scala集合的方法进行区分

如以下代码中的两个foreach方法,第一个为scala集合(单点)中的方法,第二个为RDD(分布式)的方法

val wordCount: RDD[(String, Int)] = sc.textFile("data/word.txt").map((_,1)).reduceByKey(_ + _) val array: Array[(String, Int)] = wordCount.collect() array.foreach(println) wordCount.foreach(println) 单Value类型 1 map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

def map[U: ClassTag](f: T => U): RDD[U]

map算子表示将数据源中的每一条数据进行处理

map算子的参数是函数类型:Int => U,输入Int类型的数据,输出类型不确定

“转换”概念的体现过程:从rdd算子转换成了newRdd算子

def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4)) def mapFunction ( num : Int) : Int = { num * 2 } val newRdd: RDD[Int] = rdd.map(mapFunction) newRdd.collect().foreach(println) sc.stop() }

每次写mapFunction有些麻烦,可以使用函数至简原则

val newRdd: RDD[Int] = rdd.map(_ * 2)

转换之后如何分区,数据执行的顺序如何

查看newRdd分区数量(2个)

newRdd.saveAsTextFile("output")

在RDD进行转换时,新的RDD和旧的RDD的分区数量保持一致,源码如下,返回所有依赖的RDD中的第一个RDD的分区数量

/** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }

数据在处理过程中,默认情况下,分区不变,原来数据在哪个分区,转换完成之后还是在哪里

数据在处理过程中,要遵循执行顺序:分区内有序,分区间无序

使用如下代码查看是否满足分区内有序,分区间无序的规则

val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val newRdd: RDD[Int] = rdd.map( num => { println("num = " + num) num * 2 } )

RDD其实就是封装的逻辑,如果有多个RDD,那么第一条数据应该所有的逻辑执行完毕,再执行下一条数据,RDD没有等待的功能

val newRdd1: RDD[Int] = rdd.map( num => { println("############### num = " + num) num * 2 } ) val newRdd2: RDD[Int] = newRdd1.map( num => { println("*************** num = " + num) num * 2 } ) (1)案例:从服务器日志数据agent.log中获取第四列数据

部分数据如下

1516609143867 6 7 64 16 1516609143869 9 4 75 18 1516609143869 1 7 87 12 1516609143869 2 8 92 9 1516609143869 6 7 84 24 def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) val lineRDD: RDD[String] = sc.textFile("data/agent.log") val forthRDD: RDD[String] = lineRDD.map( line => { val datas = line.split(" ") datas(3) } ) forthRDD.collect().foreach(println) sc.stop() } 2 mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,Iterator[T] => Iterator[U],数据量可以增加

def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] ) val rdd = sc.makeRDD(List(1,2,3,4),2) val rdd1 = rdd.mapPartitions( list => { println("*************") list.map(_ * 2) } )

将分区内的数据先放在Executor中(缓存),增加效率,以上程序中mapPartitions执行两次,而map执行四次,虽然提升了效率,但也存在缺点

占用内存多处理完的数据不会释放 (1)map和mapPartitions的区别

数据处理角度

Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。

功能的角度

Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

性能的角度

Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用,一般使用map操作

综上,有时完成比完美更重要

(2)java克隆浅复制

何时考虑不使用接口而是使用实现类:当需要使用实现类中的特有方法时,会定义为实现类类型,第一种声明方式,ArrayList中的特有方式无法使用,比如clone方法

public class Test { public static void main(String[] args) { User user = new User(); user.name = "zhangsan"; List userList = new ArrayList(); //userList.clone(); ArrayList userList1 = new ArrayList(); userList1.clone(); } } class User{ public String name; }

克隆之后,两块内存空间不同

userList1.add(user); ArrayList userList2 = (ArrayList)userList1.clone(); System.out.println(userList1 == userList2); //false

以下代码,两块内存地址引用了相同的对象

final User user1 = userList2.get(0); user1.name = "lisi"; System.out.println(userList1 == userList2); //false System.out.println(userList1); //[com.hike.bigdata.spark.test.User@1698c449] System.out.println(userList2); //[com.hike.bigdata.spark.test.User@1698c449]

这种现象,称为java克隆浅复制,只复制集合最外层集合的内存,但是如果集合引用了其他内存,不会复制

java克隆浅复制存在引用问题,mapPartitions处理完的数据不会释放,那么引用也不会被释放,当全部处理完成时,才会释放,意味着数据越多,持续时间越长,占用内存空间越大

(3)案例:获取每个数据分区的最大值 def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4),2) val rdd1: RDD[Int] = rdd.mapPartitions( list => { val max = list.max List(max).iterator } ) rdd1.collect().foreach(println) } 3 mapPartitionsWithIndex

现有三个分区,只获取第二个分区的数据

分区间无序,所以以下代码是错误的

var count = 0 val rdd1 = rdd.mapPartitions( list => { if(count == 1){ count = count + 1 list }else{ count = count + 1 Nil.iterator } } )

使用mapPartitionsWithIndex方法

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4,5,6),3) val rdd1 = rdd.mapPartitionsWithIndex( (index,list) => { if(index == 1){ list }else{ Nil.iterator } } ) rdd1.collect().foreach(println) } 4 flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD( List("hello scala", "hello spark") ) val rdd1: RDD[String] = rdd.flatMap(_.split(" ")) rdd1.collect().foreach(println) }

flatMap输入是数据集的整体(一个),返回的是拆分后的个体(多个),使用容器将个体包装起来

val rdd1 = rdd.flatMap( str => { str.split(" ") } ) def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) val rdd: RDD[List[Int]] = sc.makeRDD( List( List(1, 2), List(3, 4) ) ) val rdd1 = rdd.flatMap( List =>List ) rdd1.collect().foreach(println) }

第一个List是整体,第二个List代表的是容器

(1)案例:将List(List(1,2),3,List(4,5))进行扁平化操作

模式匹配

val rdd1 = rdd.flatMap { case list : List[_] => list case other => List(other) } 5 glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

def glom(): RDD[Array[T]]

将个体变成整体

val rdd: RDD[Int] = sc.makeRDD( List(1, 2, 3, 4, 5, 6), 2 ) val rdd1: RDD[Array[Int]] = rdd.glom() rdd1.collect().foreach(a => println(a.mkString(","))) // 1,2,3 // 4,5,6 (1)案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和) val rdd: RDD[Int] = sc.makeRDD( List(1, 2, 3, 4, 5, 6), 2 ) val rdd1: RDD[Array[Int]] = rdd.glom() val rdd2: RDD[Int] = rdd1.map(_.max) println(rdd2.collect().sum) 6 groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

此算子根据函数计算结果进行分组,执行结果为KV键值对数据类型,K是分组标识,V为同一个组中的数据集合

val rdd: RDD[Int] = sc.makeRDD( List(1, 2, 3, 4, 5, 6), 2 ) val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2) rdd1.collect().foreach(println)

默认情况下,数据处理后所在的分区不会发生改变

Spark要求,一个组的数据必须在一个分区中

一个分区的数据被打乱,和其他分区的数据组合在一起,这个操作称为shuffle,现在想进行两分区数据相加,但第一个RDD中有很多分区,分区内有很多数据,shuffle如何做计算呢,在内存中是否等待所有数据到来之后再进行运算,即使等待,内存不够怎么办?

所以,shuffle操作不允许在内存中等待,必须落盘,因此shuffle的速度慢

shuffle会将完整的计算过程一分为二,形成两个阶段,一个阶段用于写数据,一个阶段用于读数据

写数据的阶段如果没有完成,读数据的阶段不能执行

conf.set("spark.local.dir","e:/")

通过windows下的spark环境,执行groupBy相关代码,可以在监控页面查看到“Shuffle Read”和“Shuffle Write”两个阶段

shuffle的操作可以更改分区

val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2,2) (1)案例:将List(“Hello”,“hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。 val rdd: RDD[String] = sc.makeRDD( List("Hello","hive", "hbase", "Hadoop") ) val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1)) rdd1.collect().foreach(println)

不区分首字母大小写

val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1).toUpperCase()) (2)案例:按照agent.log中的第二列数据分组,相同求和 def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("data/agent.log") val groupRDD: RDD[(String, Iterable[(String, Int)])] = lines.map( lines => { val datas: Array[String] = lines.split(" ") (datas(1), 1) } ).groupBy(_._1) val value: RDD[(String, Int)] = groupRDD.mapValues(_.size) value.collect().foreach(println) }

groupBy算子可以实现WordCount(共有十种算子可以实现,1/10)



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3