SparkSql中DataFrame与json、csv、mysql、hive数据的互操作

您所在的位置:网站首页 spark读文件存hive表 SparkSql中DataFrame与json、csv、mysql、hive数据的互操作

SparkSql中DataFrame与json、csv、mysql、hive数据的互操作

2023-12-03 17:18| 来源: 网络整理| 查看: 265

SparkSql中DataFrame与json、csv、mysql、hive数据的互操作 1.RDD转换成DataFrame

1.1 RDD to DataFrame

RDD转成DataFrame有2种方式,一种是通过隐式转换,一种是通过SparkSession来进行创建。

1.1.1 RDD隐式转换成DataFrame(推荐)

object RDDToDF { def main(args: Array[String]): Unit = { // 创建一个SparkSession val session = SparkSession.builder().master("local[2]").appName("RDDToDF").getOrCreate() // 通过session获取SparkContext val sc = session.sparkContext // 将文件中的数据转换成RDD val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Person(x(0), x(1).toLong)) rdd.foreach(println) // 导入隐式转换(****重要****) import session.implicits._ // 将rdd转换成DataFrame val df = rdd.toDF() df.show() // 打印 DataFrame 的 Schema约束信息 df.printSchema() } } // 创建一个 case 类,Person封装RDD中的数据,需要给DataFrame形成一个Schema约束 case class Person(name: String, age: Long)

输出结果:

image

image

1.1.2 通过SaprkSession创建的方式

需要通过SchemaType来进行约束

object RDDToDF2 { def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local[2]").appName("RDDToDF2").getOrCreate() val sc = session.sparkContext // 这里不使用case class类,使用spark 提供的 Row类 val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Row(x(0), x(1).trim)) val schemaString = "name age" val splits = schemaString.split(" ") // 约束字段 val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) // 约束类型 val schema = StructType(fields) // 将RDD转换成DataFrame val df = session.createDataFrame(rdd, schema) // 显示DataFrame的数据 df.show() // 打印schema df.printSchema() } }

输出结果:

image

1.2 DataFrame几个api介绍

show():将DataFrame的数据直接输出

printSchema():打印schema约束信息

select():选择字段

filter():对DataFrame字段进行过滤

groupby():对字段进行分组

count():求数量

DataFrame还有一些其他的api,作用和sql语句类似

eg:

df.select("name").show()

结果:

image

# 这里$"age"需要导入隐式转换才行 df.select("age").filter($"age" >=28).show()

结果:

image

df.groupBy("name").count().show()

结果:

image

2.json文件转DataFrame

2.1 json转DataFrame

// 创建session对象 val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate() // 通过session 读取 json数据,转换成DataFrame对象 val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") // 展示数据 df.show()

结果:

image

2.2 DataFrame创建temp view,使用sql语句查询

object JsonToDF { def main(args: Array[String]): Unit = { // 创建session对象 val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate() // 通过session 读取 json数据,转换成DataFrame对象 val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") // 将DataFrame的数据,创建成一个临时的"表",表名:person,这个只能在当前的session域中有效 df.createOrReplaceTempView("person") // 通过sql语句进行查询 session.sql("select * from person").show() // 创建一个全局的temp“表”,这个可以跨session会话有效 df.createGlobalTempView("person") session.newSession().sql("select * from global_temp.person where age>=28").show() session.stop() } }

结果:

image

image

3.csv文件转DataFrame

3.1 csv文件转DataFrame

val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate() // 通过指定格式,load数据,option:可以给文件格式添加参数,如是否有header。如果没有header,生成的DataFrame会默认生成如:_c1,_c2 //val df = session.read.format("csv").option("header", "true").load("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv") // df封装的csv方法 val df = session.read.option("header", "true").csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv") df.show()

image

3.2 保存DataFrame数据到文件

object CsvToDF { def main(args: Array[String]): Unit = { val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate() val df = session.read.csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.csv") // 导入隐式转换 import session.implicits._ // 将df的数据保存到本地文件,这里因为csv没有头信息,没有对应的schema df.filter($"_c1" > 28).write.csv("d:/csvfile") } }

csv文件的储存和内容:

image

image

4.mysql数据与DataFrame的互操作

4.1 DataFrame将数据写入Mysql

object DFToMysql { def main(args: Array[String]): Unit = { val session = SparkSession.builder().appName("DFToMysql").master("local[2]").getOrCreate() val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") df.show() // df写入mysql的第一种方式 val p = new Properties() p.put("user", "root") p.put("password", "root") // 将df数据写入mysql,这里mode的模式有4种:append:追加,overwrite:覆盖,error , ignore // overwrite:如果表不存在,会自动创建,覆盖会将之间定义好字段类型,进行修改 df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p) //df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p) // df写入mysql的第二种方式 /*df.write.mode("append") .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/db_people") .option("dbtable", "tb_people") .option("user", "root") .option("password", "root") .save()*/ } }

结果:

image

4.2 Mysql数据导入为DataFrame

object MySqlToDF { def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local[2]").appName("MySqlToDF").getOrCreate() // jdbc加载的第一种方式 val p = new Properties() p.put("user", "root") p.put("password", "root") // 通过加载jdbc,获取DataFrame数据 val df = session.read.format("jdbc").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", properties = p) df.show() // 加载jdbc的第2中方式 /*val df2 = session.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/db_people") .option("dbtable", "tb_people") .option("user", "root") .option("password", "root") .load() df2.show()*/ } }

image

5 Spark使用HQL查询hive中的数据

前提:需要 安装Hive

5.1 将hive-site.xml放入resource目录

image

hive-site.xml内容

javax.jdo.option.ConnectionURL jdbc:mysql://server01:3306/hive?createDatabaseIfNotExist=true JDBC connect string for a JDBC metastore javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName root username to use against metastore database javax.jdo.option.ConnectionPassword root password to use against metastore database

5.2 Spark操作Hive数据

Spark新的api实现,使用SparkSession实现

object HiveToDF { def main(args: Array[String]): Unit = { // 新版本 val session = SparkSession.builder().appName("hivetosql").master("local").enableHiveSupport().getOrCreate() session.sqlContext.sql("use default") session.sqlContext.sql("select * from student").show() } }

使用HiveContext实现

object HiveToDF { def main(args: Array[String]): Unit = { // 老版本 val conf = new SparkConf().setAppName("HiveToDF").setMaster("local") val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) hiveContext.sql("use default") hiveContext.sql("select * from student").show() } }

image

补充:spark-shell直接操作Hive数据

将hive-site.xml文件放入$SPARK_HOME/conf目录下

启动spark-shell时指定mysql连接驱动位置

spark-shell \ --master spark://server01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar

image

操作hive数据 # spark是SparkSession对象 spark.sqlContext.sql("select * from student").show()

image

或者

import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql("select * from student").show()

image

补充2:Spark-sql直接操作Hive

将hive-site.xml文件放入$SPARK_HOME/conf目录下

启动spark-shell时指定mysql连接驱动位置

spark-sql \ --master spark://server01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar

image

3.操作hive数据

show tables;

image

select * from person;

image



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3