spark上传文件到hdfs spark数据导入hbase

您所在的位置:网站首页 sparksql写入hbase速度优化 spark上传文件到hdfs spark数据导入hbase

spark上传文件到hdfs spark数据导入hbase

#spark上传文件到hdfs spark数据导入hbase| 来源: 网络整理| 查看: 265

spark上传文件到hdfs spark数据导入hbase 转载

jimoshalengzhou 2023-06-29 16:15:09

文章标签 spark上传文件到hdfs hadoop spark hbase apache 文章分类 Spark 大数据

hadoop spark hbase

集群环境:一主三从,Spark为Spark On YARN模式

Spark导入hbase数据方式有多种

1.少量数据:直接调用hbase API的单条或者批量方法就可以

2.导入的数据量比较大,那就需要先生成hfile文件,在把hfile文件加载到hbase里面

下面主要介绍第二种方法:

该方法主要使用spark Java API的两个方法:

1.textFile:将本地文件或者HDFS文件转换成RDD

2.flatMapToPair:将每行数据的所有key-value对象合并成Iterator对象返回(针对多family,多column)

代码如下:

package scala; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.storage.StorageLevel; import util.HFileLoader; public class HbaseBulkLoad { private static final String ZKconnect="slave1,slave2,slave3:2181"; private static final String HDFS_ADDR="hdfs://master:8020"; private static final String TABLE_NAME="DBSTK.STKFSTEST";//表名 private static final String COLUMN_FAMILY="FS";//列族 public static void run(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", ZKconnect); configuration.set("fs.defaultFS", HDFS_ADDR); configuration.set("dfs.replication", "1"); String inputPath = args[0]; String outputPath = args[1]; Job job = Job.getInstance(configuration, "Spark Bulk Loading HBase Table:" + TABLE_NAME); job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类 job.setMapOutputValueClass(KeyValue.class);//指定输出值类 job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.addInputPaths(job, inputPath);//输入路径 FileSystem fs = FileSystem.get(configuration); Path output = new Path(outputPath); if (fs.exists(output)) { fs.delete(output, true);//如果输出路径存在,就将其删除 } fs.close(); FileOutputFormat.setOutputPath(job, output);//hfile输出路径 //初始化sparkContext SparkConf sparkConf = new SparkConf().setAppName("HbaseBulkLoad").setMaster("local[*]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); //读取数据文件 JavaRDD lines = jsc.textFile(inputPath); lines.persist(StorageLevel.MEMORY_AND_DISK_SER()); JavaPairRDD hfileRdd = lines.flatMapToPair(new PairFlatMapFunction() { private static final long serialVersionUID = 1L; @Override public Iterator call(String text) throws Exception { List tps = new ArrayList(); if(null == text || text.length() 本文章为转载内容,我们尊重原作者对文章享有的著作权。如有内容错误或侵权问题,欢迎原作者联系我们进行内容更正或删除文章。 收藏 评论 分享 举报

上一篇:python 发明人 python的发明

下一篇:python中在文件里写入两行文本 python怎么写两行代码



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3