SparkSql中DataFrame与json、csv、mysql、hive数据的互操作 |
您所在的位置:网站首页 › spark读文件存hive表 › SparkSql中DataFrame与json、csv、mysql、hive数据的互操作 |
SparkSql中DataFrame与json、csv、mysql、hive数据的互操作
1.RDD转换成DataFrame
1.1 RDD to DataFrame RDD转成DataFrame有2种方式,一种是通过隐式转换,一种是通过SparkSession来进行创建。 1.1.1 RDD隐式转换成DataFrame(推荐) object RDDToDF { def main(args: Array[String]): Unit = { // 创建一个SparkSession val session = SparkSession.builder().master("local[2]").appName("RDDToDF").getOrCreate() // 通过session获取SparkContext val sc = session.sparkContext // 将文件中的数据转换成RDD val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Person(x(0), x(1).toLong)) rdd.foreach(println) // 导入隐式转换(****重要****) import session.implicits._ // 将rdd转换成DataFrame val df = rdd.toDF() df.show() // 打印 DataFrame 的 Schema约束信息 df.printSchema() } } // 创建一个 case 类,Person封装RDD中的数据,需要给DataFrame形成一个Schema约束 case class Person(name: String, age: Long)输出结果: 1.1.2 通过SaprkSession创建的方式 需要通过SchemaType来进行约束 object RDDToDF2 { def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local[2]").appName("RDDToDF2").getOrCreate() val sc = session.sparkContext // 这里不使用case class类,使用spark 提供的 Row类 val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Row(x(0), x(1).trim)) val schemaString = "name age" val splits = schemaString.split(" ") // 约束字段 val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) // 约束类型 val schema = StructType(fields) // 将RDD转换成DataFrame val df = session.createDataFrame(rdd, schema) // 显示DataFrame的数据 df.show() // 打印schema df.printSchema() } }输出结果: 1.2 DataFrame几个api介绍 show():将DataFrame的数据直接输出 printSchema():打印schema约束信息 select():选择字段 filter():对DataFrame字段进行过滤 groupby():对字段进行分组 count():求数量 DataFrame还有一些其他的api,作用和sql语句类似 eg: df.select("name").show()结果: 结果: 结果: 2.1 json转DataFrame // 创建session对象 val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate() // 通过session 读取 json数据,转换成DataFrame对象 val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") // 展示数据 df.show()结果: 2.2 DataFrame创建temp view,使用sql语句查询 object JsonToDF { def main(args: Array[String]): Unit = { // 创建session对象 val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate() // 通过session 读取 json数据,转换成DataFrame对象 val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") // 将DataFrame的数据,创建成一个临时的"表",表名:person,这个只能在当前的session域中有效 df.createOrReplaceTempView("person") // 通过sql语句进行查询 session.sql("select * from person").show() // 创建一个全局的temp“表”,这个可以跨session会话有效 df.createGlobalTempView("person") session.newSession().sql("select * from global_temp.person where age>=28").show() session.stop() } }结果: 3.1 csv文件转DataFrame val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate() // 通过指定格式,load数据,option:可以给文件格式添加参数,如是否有header。如果没有header,生成的DataFrame会默认生成如:_c1,_c2 //val df = session.read.format("csv").option("header", "true").load("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv") // df封装的csv方法 val df = session.read.option("header", "true").csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv") df.show()3.2 保存DataFrame数据到文件 object CsvToDF { def main(args: Array[String]): Unit = { val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate() val df = session.read.csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.csv") // 导入隐式转换 import session.implicits._ // 将df的数据保存到本地文件,这里因为csv没有头信息,没有对应的schema df.filter($"_c1" > 28).write.csv("d:/csvfile") } }csv文件的储存和内容: 4.1 DataFrame将数据写入Mysql object DFToMysql { def main(args: Array[String]): Unit = { val session = SparkSession.builder().appName("DFToMysql").master("local[2]").getOrCreate() val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json") df.show() // df写入mysql的第一种方式 val p = new Properties() p.put("user", "root") p.put("password", "root") // 将df数据写入mysql,这里mode的模式有4种:append:追加,overwrite:覆盖,error , ignore // overwrite:如果表不存在,会自动创建,覆盖会将之间定义好字段类型,进行修改 df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p) //df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p) // df写入mysql的第二种方式 /*df.write.mode("append") .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/db_people") .option("dbtable", "tb_people") .option("user", "root") .option("password", "root") .save()*/ } }结果: 4.2 Mysql数据导入为DataFrame object MySqlToDF { def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local[2]").appName("MySqlToDF").getOrCreate() // jdbc加载的第一种方式 val p = new Properties() p.put("user", "root") p.put("password", "root") // 通过加载jdbc,获取DataFrame数据 val df = session.read.format("jdbc").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", properties = p) df.show() // 加载jdbc的第2中方式 /*val df2 = session.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/db_people") .option("dbtable", "tb_people") .option("user", "root") .option("password", "root") .load() df2.show()*/ } }前提:需要 安装Hive 5.1 将hive-site.xml放入resource目录 hive-site.xml内容 javax.jdo.option.ConnectionURL jdbc:mysql://server01:3306/hive?createDatabaseIfNotExist=true JDBC connect string for a JDBC metastore javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName root username to use against metastore database javax.jdo.option.ConnectionPassword root password to use against metastore database5.2 Spark操作Hive数据 Spark新的api实现,使用SparkSession实现 object HiveToDF { def main(args: Array[String]): Unit = { // 新版本 val session = SparkSession.builder().appName("hivetosql").master("local").enableHiveSupport().getOrCreate() session.sqlContext.sql("use default") session.sqlContext.sql("select * from student").show() } }使用HiveContext实现 object HiveToDF { def main(args: Array[String]): Unit = { // 老版本 val conf = new SparkConf().setAppName("HiveToDF").setMaster("local") val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) hiveContext.sql("use default") hiveContext.sql("select * from student").show() } }将hive-site.xml文件放入$SPARK_HOME/conf目录下 启动spark-shell时指定mysql连接驱动位置 spark-shell \ --master spark://server01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar或者 import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) hiveContext.sql("select * from student").show()将hive-site.xml文件放入$SPARK_HOME/conf目录下 启动spark-shell时指定mysql连接驱动位置 spark-sql \ --master spark://server01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar3.操作hive数据 show tables; |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |