Spark读取文本文件并转换为DataFrame

您所在的位置:网站首页 spark怎么读取文本文件数据 Spark读取文本文件并转换为DataFrame

Spark读取文本文件并转换为DataFrame

2024-07-11 21:23| 来源: 网络整理| 查看: 265

本文将介绍spark读取多列txt文件后转成DataFrame的两种方法。

数据是Spark中自带的:sample_movielens_ratings.txt

//形式如下面所示 0::2::3::1424380312 0::3::1::1424380312 0::5::2::1424380312 0::9::4::1424380312 0::11::1::1424380312 0::12::2::1424380312 0::15::1::1424380312 0::17::1::1424380312 0::19::1::1424380312 0::21::1::1424380312 0::23::1::1424380312

 

一、通过反射机制将RDD转为DataFrame

  Scala由于其具有隐式转换的特性,所以Spark SQL的Scala接口,是支持自动将包含了case class的RDD转换为DataFrame的。case class就定义了元数据。Spark SQL会通过反射读取传递给case class的参数的名称,然后将其作为列名。

import org.apache.spark.ml.linalg.Vectors import spark.implicits._ case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long) val rdd = sc.textFile("/data/mllib/als/sample_movielens_ratings.txt") def parseRating(str: String): Rating = { val fields = str.split("::") assert(fields.size == 4) Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong) } val ratings = spark.read.textFile("/data/mllib/als/sample_movielens_ratings.txt") .map(parseRating) .toDF() ratings.printSchema ratings.show() 二、通过动态编程的方式将RDD转为DataFrame import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val rdd = sc.textFile("/data/mllib/als/sample_movielens_ratings.txt") val schema = StructType(Array( StructField("userId", IntegerType, true), StructField("movieId", IntegerType, true), StructField("rating", FloatType, true), StructField("timestamp", LongType, true) )) // 对每一行的数据进行处理 val rowRDD = rdd.map(_.split("::")).map(p => Row(p(0).toInt,p(1).toInt,p(2).toFloat,p(3).toLong)) val data = spark.createDataFrame(rowRDD, schema) data.printSchema data.createOrReplaceTempView("test") spark.sql("select *from test").show()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3