RDD的三种创建方式

您所在的位置:网站首页 java创建数组的方式有哪些种类 RDD的三种创建方式

RDD的三种创建方式

2024-07-13 07:32| 来源: 网络整理| 查看: 265

Spark提供三种创建RDD方式: 集合、本地文件、HDFS文件

使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进 行离线批处理操作 集合

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。

相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。 Spark会为每一个partition运行一个task来进行处理。 Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

scala版本

import org.apache.spark.{SparkConf, SparkContext} /** * 需求:使用集合创建RDD */ object CreateRddByArrayScala { def main(args: Array[String]): Unit = { // 创建SparkContext val conf = new SparkConf() conf.setAppName("CreateRddByArrayScala ") // 设置任务名称 .setMaster("local") // local表示在本地执行 val sc = new SparkContext(conf) // 创建集合 var arr = Array(1, 2, 3, 4, 5) // 基于集合创建RDD val rdd = sc.parallelize(arr) val sum = rdd.reduce(_ + _) println(sum) // 停止SparkContext sc.stop() } }

Java版本

import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 需求:使用集合创建RDD */ public class CreateRddByArrayJava { public static void main(String[] args) { // 创建SparkContext: SparkConf conf = new SparkConf(); conf.setAppName("CreateRddByArrayJava") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 创建集合 List arr = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(arr); Integer sum = rdd.reduce(new Function2() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); System.out.println(sum); // 停止sparkContext sc.stop(); } } 使用本地文件和HDFS文件

通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容

textFile()方法支持针对目录、压缩文件以及通配符创建RDD

Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话设置是不生效的

scala版本

import org.apache.spark.{SparkConf, SparkContext} /** * 需求:通过文件创建RDD * * 1:本地文件 * 2:HDFS文件 */ object CreateRddByFileScala { def main(args: Array[String]): Unit = { // 创建SparkContext val conf = new SparkConf() conf.setAppName("CreateRddByFileScala") // 设置任务名称 .setMaster("local") // local表示在本地执行 val sc = new SparkContext(conf) var path = "~/hello.txt" path = "hdfs://bigdata01:9000/test/hello.txt" // 读取文件数据,可以在textFile中指定生成的RDD的分区数量 val rdd = sc.textFile(path,2) // 获取每一行数据的长度,计算文件内数据的总长度 val length = rdd.map(_.length).reduce(_ + _) println(length) sc.stop() } }

Java版本

import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; public class CreateRddByFileJava { public static void main(String[] args) { // 创建SparkContext SparkConf conf = new SparkConf(); conf.setAppName("CreateRddByArrayJava") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String path = "~/hello.txt"; path = "hdfs://bigdata01:9000/test/hello.txt"; // 读取文件数据,可以在textFile中指定生成的RDD的分区数量 JavaRDD rdd = sc.textFile(path,2); // 获取每一行数据的长度 JavaRDD lengthRDD = rdd.map(new Function() @Override public Integer call(String line) throws Exception { return line.length(); } }); // 计算文件内数据的总长度 Integer length = lengthRDD.reduce(new Function2


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3