HBase的Java API的操作

您所在的位置:网站首页 hbase清空表数据命令 HBase的Java API的操作

HBase的Java API的操作

2023-05-06 01:39| 来源: 网络整理| 查看: 265

HBase的Java API的操作 精选 原创

wx62be9d88ce294 2023-04-16 07:53:43 ©著作权

文章标签 java hbase 开发语言 apache Test 文章分类 jQuery 前端开发

©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任

HBase的Java API的操作准备工作1- 创建一个新的工程2- 导入相关的依赖 aliyun http://maven.aliyun.com/nexus/content/groups/public/ true false never org.apache.hbase hbase-client 2.1.0 org.apache.commons commons-io 1.3.2 org.slf4j slf4j-log4j12 1.7.6 log4j log4j 1.2.16 org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 1.8 3- 创建包结构: com.itheima.hbase4- 在此包下, 创建一个测试类: HBaseTestpackage com.itheima.hbase; public class HBaseTest { }创建表

需求说明:

HBase的Java API的操作_hbase

将水表抄表数据存储到HBase

操作步骤:1- 创建Java连接HBase的连接对象 2- 根据连接对象, 获取相关的管理对象: Admin(表定义语句操作) Table(表数据操作) 3- 执行相关的操作 4- 处理结果集(只有查询才需要处理结果集) 5- 释放资源

代码实现:

package com.itheima.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.junit.Test; public class HBaseTest { // 创建表 @Test public void test01_createTable() throws Exception{ // 1- 创建Java连接HBase的连接对象 // Configuration conf = new Configuration(); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); Connection hbaseConn = ConnectionFactory.createConnection(conf); // 2- 根据连接对象, 获取相关的管理对象: Admin(表定义语句操作) Table(表数据操作) Admin admin = hbaseConn.getAdmin(); // 3- 执行相关的操作 boolean flag = admin.tableExists(TableName.valueOf("WATER_BILL")); // 存在 返回True 不存在 返回False if(!flag){ // 说明表不存在 // ColumnFamilyDescriptor : 列族的结构信息对象 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes()).build(); // TableDescriptor : 表的结构信息对象 TableDescriptor desc = TableDescriptorBuilder .newBuilder(TableName.valueOf("WATER_BILL")) .setColumnFamily(familyDesc).build(); admin.createTable(desc); } // 4- 处理结果集(只有查询才需要处理结果集) // 5- 释放资源 admin.close(); hbaseConn.close(); } }插入数据

代码实现:

// 插入数据 @Test public void test02_addData() throws Exception{ //1. 根据HBase的连接工厂, 创建HBase的连接对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); Connection hbaseConn = ConnectionFactory.createConnection(conf); //2. 根据连接对象, 获取相关的管理对象: Admin Table Table table = hbaseConn.getTable(TableName.valueOf("WATER_BILL")); //3. 执行相关的操作: 添加数据 put '表名', 'rk','列名','列值' Put put = new Put("4944191".getBytes()); put.addColumn("C1".getBytes(),"name".getBytes(),"登卫红".getBytes()); put.addColumn("C1".getBytes(),"address".getBytes(),"贵州省铜仁市德江县7单元267室".getBytes()); put.addColumn("C1".getBytes(),"sex".getBytes(),"男".getBytes()); table.put(put); //4. 处理结果集 //5. 释放资源 table.close(); hbaseConn.close(); }提取公共的方法// 会在 @Test方法执行前 执行 private Connection hbaseConn; private Admin admin; private Table table; private String tableName = "WATER_BILL"; @Before public void before() throws Exception{ //1. 根据HBase的连接工厂, 创建HBase的连接对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); hbaseConn = ConnectionFactory.createConnection(conf); //2. 根据连接对象, 获取相关的管理对象: Admin Table admin = hbaseConn.getAdmin(); table = hbaseConn.getTable(TableName.valueOf(tableName)); } // 会在@Test方法执行后 执行 @After public void after() throws Exception{ //5. 释放资源 admin.close(); table.close(); hbaseConn.close(); }查询某一条数据// 根据rowkey 查询数据 @Test public void test03_findByRowKey() throws Exception{ //3. 执行相关的操作: rowkey查询 get '表名','rk' Get get = new Get("4944191".getBytes()); // get.addColumn("C1".getBytes(),"name".getBytes()); // get.addColumn("C1".getBytes(),"sex".getBytes()); // get.addFamily("C1".getBytes()); Result result = table.get(get); // 一个result 表示就是一行数据, 而一行数据是由多个单元格构成的 //4. 处理结果集 // 4.1 从result中拿到每一个单元格 List cells = result.listCells(); // 4.2 遍历每一个单元格 for (Cell cell : cells) { // 4.3 从单元格中获取数据 // byte[] familyArray = cell.getFamilyArray(); // String family = new String(familyArray, cell.getFamilyOffset(), cell.getFamilyLength()); // System.out.println(family); // 获取rowkey String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); // 获取 列族 String family = Bytes.toString(CellUtil.cloneFamily(cell)); // 获取 列名 String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); // 获取列值 String columnValue = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println("rowkey为:"+rowkey +"; 列族为:"+family +"; 列名为:"+columnName+";列值为:"+columnValue); } }如何删除某一条数据// 删除数据 @Test public void test04_deleteData() throws Exception{ //3. 执行相关的操作 Delete delete = new Delete("4944191".getBytes()); // 直接删除一整行数据的 // delete.addFamily("C1".getBytes()); 支持删除某一列族下的数据 // delete.addColumn("C1".getBytes(),"name".getBytes()); 支持删除某一列的数据 table.delete(delete); }如何删除表// 删除表 @Test public void test05_deleteTable() throws Exception{ //3. 执行相关的操作: 删除表 // 3.1 判断表是否是禁用状态 boolean flag = admin.isTableDisabled(TableName.valueOf(tableName)); // 禁用 为True if (!flag){ // 表示表没有被禁用, 首先要先禁用表 admin.disableTable(TableName.valueOf(tableName)); } admin.deleteTable(TableName.valueOf(tableName)); }导入数据操作导入数据语法hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径 注意: 此命令需要在linux的命令窗口下执行(而非Hbase的shell窗口下)导出数据语法hbase org.apache.hadoop.hbase.mapreduce.Export 表名 导出路径 注意: 此命令需要在linux的命令窗口下执行(而非Hbase的shell窗口下)

完成10W抄表数据导入操作:

HBase的Java API的操作_开发语言_02

1- 将资料中 10w的抄表数据上传到Linux 2- 在linux中将文件上传到HDFS中: hdfs dfs -mkdir -p /hbase/water_bill/input hdfs dfs -put part-m-00000_10w /hbase/water_bill/input 3- 执行导入数据操作 hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /hbase/water_bill/input/part-m-00000_10w基于SCAN扫描数据

需求: 查询 2020年 6月份的所有用户的用水量 :

日期字段: C1: RECORD_DATE (String)

用水量字段: C1:NUM_USAGE (Double)

用户字段: C1:NAME(String)

/* 需求: 查询 2020年 6月份的所有用户的用水量 : 日期字段: C1: RECORD_DATE (String) 用水量字段: C1:NUM_USAGE (Double) 用户字段: C1:NAME(String) SQL: select 用户, 日期, 用水量 from water_bill where RECORD_DATE >= '2020-06-01' and RECORD_DATE < '2020-07-01' */ @Test public void test06_findScan() throws Exception{ //3- 执行相关的操作 Scan scan = new Scan(); // 3.1 设置过滤器 SingleColumnValueFilter filter1 = new SingleColumnValueFilter( "C1".getBytes(),"RECORD_DATE".getBytes(), CompareOperator.GREATER_OR_EQUAL,new BinaryComparator("2020-06-01".getBytes())); SingleColumnValueFilter filter2 = new SingleColumnValueFilter( "C1".getBytes(),"RECORD_DATE".getBytes(), CompareOperator.LESS,new BinaryComparator("2020-07-01".getBytes())); // 将过滤器组合在一起, 默认的关系为并列 FilterList filterList = new FilterList(); filterList.addFilter(filter1); filterList.addFilter(filter2); //3.2 限制返回来的字段 scan.addColumn("C1".getBytes(),"NAME".getBytes()); scan.addColumn("C1".getBytes(),"RECORD_DATE".getBytes()); scan.addColumn("C1".getBytes(),"NUM_USAGE".getBytes()); // 3.3 限制返回来的条数 scan.setLimit(10); // 3.4 根据rowkey的范围获取数据 // scan.withStartRow("开始rowkey".getBytes()); // scan.withStopRow("结束rowkey".getBytes()); scan.setFilter(filterList); ResultScanner results = table.getScanner(scan); //4. 处理结果集 // 4.1 遍历results 获取每一行的result对象 for (Result result : results) { // 4.2 获取每一行的所有的单元格 List cellList = result.listCells(); // 4.3 遍历每一个单元格 for (Cell cell : cellList) { // 4.4 从单元格中获取数据 // 获取rowkey String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); // 获取 列族 String family = Bytes.toString(CellUtil.cloneFamily(cell)); // 获取列名 String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); // 获取 列值 Object columnValue; if(columnName.equals("NUM_USAGE")){ columnValue = Bytes.toDouble(CellUtil.cloneValue(cell)); }else { columnValue = Bytes.toString(CellUtil.cloneValue(cell)); } System.out.println("rowkey:"+ rowkey+";列族:"+family+";列名为:"+columnName+";列值为:"+columnValue); } System.out.println("========================"); } }

收藏 评论 分享 举报

上一篇:HBase的高可用及集群架构

下一篇:HBase介绍及集群搭建



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3