Spark

您所在的位置:网站首页 对文字进行字数统计操作 Spark

Spark

2024-07-11 20:13| 来源: 网络整理| 查看: 265

 

一 、什么是RDD ?

 

         RDD在Spark【Scala语言】中,是一种数据结构【基于内存,可持久化】,就好比Java的ArrayList一样,可以进行各种的Action操作,比如Java中的List集合,可以进行get【获取元素】、add【增加元素】、remove【移除元素】等操作;

         当然,Scala语言底层实现是基于JVM的,即Scala兼容Java语言【但高效于Java】,因此,Java的List集合可以直接拿来在Scala中使用;

         对于Spark的RDD,一样有其对应的Action【行动】操作,主要操作有以下几个

 

collect:返回RDD所有元素【注:如果spark为集群模式,则从各个work节点上抓取RDD数据】 count  :  统计RDD数据集中的元素个数 reduce:并行整合RDD数据集中的元素,如(a,b)=> a+b【累加求和】, (a,b) = > if(a>b) a else b【渐进比大小,求最大元素】 foreach(func) :遍历RDD数据集中的元素,并进行func【函数】操作。如:println【打印函数】 ....etc

 

       RDD不单单是一个数据集【数据结构】,它的全称可是:弹性分布式数据集【Resilient Distributed Datasets】

       如此高逼格的名称可不是随便叫的,为什么呢?

 

 

(1)为什么称是弹性的 【个人理解】

 

        弹簧我们知道,可长可短,即可伸缩;

        在整个Spark计算的过程中,都是围绕着RDD数据集来的,即,计算一开始会产生N个RDD数据集,随着计算的推进,N个RDD会被转来转去,但是最终会得到一个RDD数据集,也就是我们想要的计算结果。

        而且整个过程是流水线【并行】的,每个流水线上【work节点】执行过程得到的RDD不用等其他流水线上的RDD,区别于MapReduce【reduce任务需要等所有的map任务完成后,才能进行】;

        而执行过程中得到的RDD都是基于内存的,因此Spark的执行效率要远高于Hadoop的MapReduce,因为MapReduce的每一个map和reduce任务都要读写磁盘【IO开销很大】

 

(2)分布式

 

       这个不做过多解释,只要涉及大数据,必提分布式

       字眼如:多台机器、并行、分区、任务调度...etc【集群】

    

(3)数据集

 

        不要见到RDD,就如同看到了三个陌生的字母一样,它是一种数据结构,准确说是一种基于内存的数据结构,在准确点说,就是scala语言对数据集在内存中的一种封装【包装】,至于为什么这样做,我说不下去了....,

 

      

 

 

 二、怎么得到一个RDD呢?

 

 

       注意这里我用的是得到,而不是创建【创建让人有种即将写代码的紧迫感,有没有】

       在我没有开始写demo来突出本篇博文的主题时,我们还是来想想,既然上面说了什么是RDD,那么,这个抽象的东西究竟怎么获得呢?

       上面我们说过,RDD是有行动【Action】操作的,也就是RDD常用到的几个函数【count、reduce...etc】,所谓的行动操作是基于RDD数据集的,注意,得先有RDD,才能进行下一步函数的调用

        在Spark中,我们把得到【创建】RDD的过程叫做RDD的转换【Transformation】过程,它是一种延迟操作,为什么这样说呢?

      [  我去,越扯越多,本来三五行demo就能搞定的博文,我居然....... ]

       这就要提到Spark的惰性机制了,RDD在转换的过程中,看似得到了一个RDD,其实这个RDD是个虚的,并没有立马在内存中创建,只有我们在执行行动操作的时候,这个RDD才从头开始执行并在内存中创建,话又说回来,RDD的转换过程有哪些常用的函数呢【也就是RDD的创建函数】

       [  太抽象? 别急,欠的demo示例,一会一起补上 ]

 

      

textFile:从本地或者HDFS文件系统中的指定文件中获取RDD数据集 map:  对数据集中的元素按照某种规则进行转换,得到新的RDD filter:对数据集中的元素进行某种规则的过滤转换,得到新的RDD .... etc

       

 

 

三、RDD简单操作

 

 

         该说的也说的差不多了,下面就把上面欠的demo补上

 

 

功能:统计word.txt文件中,单行单词数最大的,并输出结果

 

步骤:

input:  本地word.txt

转换操作:RDD1  ---> RDD2 --> RDD3

执行操作:RDD3.reduce

output:输出结果

 

 

(1)创建input  【随便找个目录,word.txt 如下】

 

 

 

a b d d e f a a d c c e h j k o i k l m n b v q w e r t y u i are v x a q w e

 

 

 

(2)将文件转换成RDD 【使用转换函数 -- textFile】  --- RDD1

 

【注:本地模式非集群模式,且demo演示主要在spark-shell中进行演示】

 

  var lines = sc.textFile("word01.txt")

 

 

 

 

注意,上面是一个完整的计算过程,如果单有RDD的转换过程是无法真正在内存中创建一个RDD的,如下面这种:

 

 

 

 

  看似正常,感觉我们好像创建了一个RDD,但是,当我们真正使用它的时候,却发现报异常了

 

 

 

这就是Spark的惰性【延迟】机制,如果RDD在一开始转换的时候就在内存中创建数据集的话,那么一开始就会报文件不存在的异常,而不是在我们调用RDD的执行函数时才发现

 

正常点,我们打印下正确的RDD的数据集的内容:

 

lines.foreach(println)

 

 

 

(3)将上一步得到的lines进一步转换,生成新的RDD  --- RDD2

 

利用map函数配合lambda表达式,重新转换lines中的元素,将每一行文本按照空格拆分成一个stirng单词数组

var lineArray = lines.map(line => line.split(" "))   

 

 

 

(4)将上一步得到的lineArray再次转换,得到新的RDD --- RDD3

 

利用map函数配合lambda表达式,再次将上一步得到的lineArray中的元素进行转换,得到新的RDD数据集lineWordSize

 

 

 

(5) 对上一步得到的RDD进行Action操作,得到最终结果 --  RDD3 --> output

 

利用reduce函数,对RDD3数据集进行整合,找出最大的单词数 

 

 

 

 

(6) 核对计算结果

 

 

注:第二行文本,多了一个空格,因此统计出来的是10个单词【不要有疑问哈】

 

 

 

 

 

四、RDD简单操作   -- 一气呵成【骚里骚气】

 

 

 【三】步骤有点太繁琐了,scala本身就是一个高效、简洁的语言(同Python),因此,我们用最简洁的方式在跑一遍demo,拿到我们最终要的计算结果:12

 

  【看好了,不要眨眼...... 激动人心的时刻到了,哈哈】

 

var result = sc.textFile("word01.txt").map(line => line.split(" ").size).reduce((a,b) => if(a>b) a else b)

 

 

 

一行代码搞定!!!!    ----   流水线操作

 

 

吾日三省吾身:" 你今天学习了吗? "

 

 

 

 

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3