Spark读取Hive数据写入Hbase

您所在的位置:网站首页 spark读取hbase Spark读取Hive数据写入Hbase

Spark读取Hive数据写入Hbase

2024-01-18 10:35| 来源: 网络整理| 查看: 265

使用spark方式写入

在使用Spark时经常需要把数据落入HBase中,如果使用普通的Java API,写入会速度很慢。还好Spark提供了Bulk写入方式的接口。那么Bulk写入与普通写入相比有什么优势呢?

BulkLoad不会写WAL,也不会产生flush以及split。如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用Bulk就不会有这个顾虑。过程中没有大量的接口调用消耗性能写数据的过程将数据一条条插入到Hbase中,这种方式运行慢且在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据,解决办法就是使用 Bulk Load 方式批量导入数据。Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase 的内部 HFile 格式文件,用来形成一个特殊的 HBase 数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

代码如下:

package HiveData2Hbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Row, SparkSession} /** * Created by BaronND */ class HBaseBulkDemo { } object HBaseBulkDemo{ def main(args: Array[String]): Unit = { val table=args(0) //数据源表 val dt=args(1)//分区日期 val spark: SparkSession = SparkSession .builder() .appName("HBase2Hive") .enableHiveSupport() .config("spark.sql.warehouse.dir", "/user/spark/warehouse") .config("spark.sql.shuffle.partitions", 1000) .getOrCreate() var testData=spark.sql("select reverse(cast(mobile as string)) as rowkey,cast(v as string) as orders" + " from "+table +" where dt = " +dt) // testData.show() import spark.implicits._ val df: RDD[Row]=testData.toDF().rdd val rdd=df.map(row=>{ val put=new Put(Bytes.toBytes(row.getAs("rowkey").toString)) put.addColumn(Bytes.toBytes("orders"), Bytes.toBytes("name"), Bytes.toBytes(row.getAs("orders").toString)) (new ImmutableBytesWritable,put) }) val conf=new Configuration() conf.set("hbase.zookeeper.quorum","ip-10-0-0-161") conf.set("hbase.zookeeper.property.clientPort","2181") conf.set(TableOutputFormat.OUTPUT_TABLE,"half_year_orders") val job=Job.getInstance(conf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) rdd.saveAsNewAPIHadoopDataset(job.getConfiguration) spark.stop() } }

pom.xml文件的配置信息,可能自己的环境上没有相关的依赖,那么就可以自己加上jar包,不必要 造成环境垃圾。

com.google.code.gson gson 2.3.1 org.apache.spark spark-yarn_2.11 2.1.0 provided org.elasticsearch elasticsearch-spark-20_2.11 5.6.5 com.alibaba fastjson 1.2.48 com.fasterxml.jackson.core jackson-databind 2.6.1 org.elasticsearch elasticsearch 5.6.5 org.elasticsearch.client transport 5.6.5 org.apache.hadoop hadoop-common 2.7.3 org.apache.hadoop hadoop-hdfs 2.7.3 org.apache.spark spark-core_2.11 2.1.0 org.apache.spark spark-streaming_2.11 2.1.0 provided org.apache.spark spark-sql_2.11 2.1.0 org.apache.spark spark-mllib_2.11 2.1.0 org.apache.spark spark-hive_2.11 2.1.0 org.apache.hbase hbase-client 1.3.0 org.apache.hbase hbase-common 1.3.0 org.apache.hbase hbase-hadoop-compat 1.3.0 org.apache.hbase hbase-annotations 1.3.0 org.apache.hbase hbase-hadoop2-compat 1.3.0 org.apache.hbase hbase-protocol 1.3.0 org.apache.hbase hbase-server 1.3.0 org.apache.zookeeper zookeeper 3.4.5 commons-io commons-io 2.1 org.spark-project.protobuf protobuf-java 2.5.0-spark 1.8 1.8 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin 2.6 jar-with-dependencies HiveData2Hbase.HBaseBulkDemo make-assembly package single

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3