Spark数据读写 |
您所在的位置:网站首页 › hdfs数据写入hbase › Spark数据读写 |
1. 本地文件的读写
1.1 读文件
import org.apache.spark.sql.SparkSession
val inputPath = "file:///Users/zz/Desktop/aa.sh"
val rdd = spark.sparkContext.textFile(inputPath)
上面代码执行后,因为Spark的惰性机制,并不会真正执行,所以即使路径错误,此时也不会报错。 1.2 写文件 val outputPath = "/Users/zz/Desktop/output" rdd.saveAsTextFile(outputPath) // 再次加载 val rdd = spark.sparkContext.textFile(outputPath)JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式。 aa.json {"name": "aa"} {"name": "bb", "age":30}{"name": "cc", "age":18} val inputPath = "/Users/zenmen/Desktop/aa.json" val rdd = spark.sparkContext.textFile(inputPath) rdd.foreach(println)JSON数据解析 import scala.util.parsing.json.JSON val inputPath = "/Users/zenmen/Desktop/aa.json" val rdd = spark.sparkContext.textFile(inputPath) val result = rdd.map(JSON.parseFull(_)) result.foreach({ r => r match { case Some(map: Map[String, Any]) => println(map) case None => println("Parsing Failed!!!") case other => println("Unknown data structure: " + other) } })输出 Map(name -> aa) Map(name -> bb, age -> 30.0) Map(name -> cc, age -> 18.0) 4. HBase 读写 4.1 HBase简介HBase是google 对BigTable的开源实现,解决Google内部大规模网页搜索问题。 1. HBase是一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳; 2. 每个值是一个未经解释的字符串,没有数据类型; 3. 用户在表中存储数据,每一行都有一个可排序的行键和任意多的列。 表:HBase采用表来组织数据,表由行和列组成,列划分为若干个列族。 行: 每个HBase表都有若干个行组成,每个行由 row key 来标志。 列族:1个HBase表被分成许多列族(Column Family)的集合,它是基本的访问控制单元。 列限定符:列族里的数据通过列限定符或列来定位。 时间戳:每个单元格都保存着同一份数据的多个版本,这些版本采用时间戳进行索引。 单元格:在HBase表中,通过行、列族和列限定符确定一个单元格cell,单元格中存储的数据没有数据类型,总被视为字节数据byte[]。 把HBase的lib目录下的一些jar文件拷贝到Spark中。 表名: student, 列族:info import org.apache.spark.sql.SparkSession import org.apache.hadoop.conf.Configuration //import org.apache.hadoop.hbase.HBaseConfiguration //import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritable object Test { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("wc") .master("local") .getOrCreate() val conf = HBaseConfiguration.create() // 设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "student") val stuRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) val count = stuRDD.count() print("Students RDD Count: " + count) // 缓存一下,以免后面重复生成 stuRDD.cache() stuRDD.foreach({ case(_,result) => val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes)) val gender = Bytes.toString(result.getValue("info".getBytes, "gender".getBytes)) val age = Bytes.toString(result.getValue("info".getBytes, "age".getBytes)) println("Row key: " + key + "Name: " + name + "Gender: "+ gender + "Age: " + age) }) } }打包编译 在simple.sbt中写入下面内容,注意对应版本号! 采用sbt打包,submit提交运行 运行 查看结果 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |