使用 Spark 来读写 HBase 数据 |
您所在的位置:网站首页 › Hbase使用ip连接 › 使用 Spark 来读写 HBase 数据 |
你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。 使用 Apache Spark 读取和写入 Apache HBase 数据 项目 03/25/2023通常使用 Apache HBase 的低级别 API(扫描、获取和放置)或者通过 Apache Phoenix 使用 SQL 语法来查询 Apache HBase。 Apache 还提供 Apache Spark HBase 连接器。 对于查询和修改 HBase 存储的数据,使用该连接器是一种便捷高效的替代方式。 先决条件部署在同一虚拟网络中的两个单独的 HDInsight 群集。 一个HBase 和一个至少安装了 Spark 2.1 (HDInsight 3.6) 的 Spark。 有关详细信息,请参阅使用 Azure 门户在 HDInsight 中创建基于 Linux 的群集。 群集主存储的 URI 方案。 对于 Azure Blob 存储,此架构为 wasb://;对于 Azure Data Lake Storage Gen2,此架构为 abfs://;对于 Azure Data Lake Storage Gen1,此架构为 adl://。 如果为 Blob 存储启用了安全传输,则 URI 将为 wasbs://。 另请参阅安全传输。 整体进程让 Spark 群集能够查询 HBase 群集的主要过程如下所示: 在 HBase 中准备一些示例数据。 从 HBase 群集配置文件夹 (/etc/hbase/conf) 获取 hbase-site.xml 文件,并将 hbase-site.xml 的副本放入 Spark 2 配置文件夹 (/etc/spark2/conf)。 (可选:使用 HDInsight 团队提供的脚本来自动执行此过程) 运行 spark-shell,在 packages 中按 Maven 坐标来引用 Spark HBase 连接器。 定义将架构从 Spark 映射到 HBase 的目录。 使用 RDD 或 DataFrame API 与 HBase 数据进行交互。 在 Apache HBase 中准备示例数据此步骤中,将在 Apache HBase 中创建并填充一个表,然后可使用 Spark 对其进行查询。 使用 ssh 命令连接到 HBase 群集。 编辑命令,将 HBASECLUSTER 替换为 HBase 群集的名称,然后输入该命令: ssh [email protected]使用 hbase shell 命令启动 HBase 交互式 shell。 在 SSH 连接中输入以下命令。 hbase shell使用 create 命令创建包含双列系列的 HBase 表。 输入以下命令: create 'Contacts', 'Personal', 'Office'使用 put 命令将指定列中的值插入特定表中的指定行。 输入以下命令: put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'使用 exit 命令停止 HBase 交互式 shell。 输入以下命令: exit 运行脚本以设置群集之间的连接若要设置群集之间的通信,请执行相关步骤,在群集上运行两个脚本。 这些脚本将自动执行“手动设置通信”部分所述的文件复制过程。 从 HBase 群集运行的脚本会将 hbase-site.xml 和 HBase IP 映射信息上传到 Spark 群集所附加的默认存储。 从 Spark 群集运行的脚本设置两个 cron 作业,以定期运行两个帮助器脚本: HBase cron 作业 - 将新的 hbase-site.xml 文件和 HBase IP 映射从 Spark 默认存储帐户下载到本地节点 Spark cron 作业 - 检查是否发生了 Spark 缩放以及群集是否安全。 如果是,则编辑 /etc/hosts 以包含本地存储的 HBase IP 映射注意:在继续之前,请确保已将 Spark 群集的存储帐户作为辅助存储帐户添加到了 HBase 群集。 请确保按所示顺序运行脚本。 在 HBase 群集上使用脚本操作以应用更改(考虑以下因素): properties Value Bash 脚本 URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh 节点类型 区域 parameters -s SECONDARYS_STORAGE_URL 持久化 是 SECONDARYS_STORAGE_URL 是 Spark 端默认存储的 URL。 参数示例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net在 Spark 群集上使用脚本操作以应用更改(考虑以下因素): properties Value Bash 脚本 URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh 节点类型 头、辅助角色、Zookeeper parameters -s "SPARK-CRON-SCHEDULE"(可选)-h "HBASE-CRON-SCHEDULE"(可选) 持久化 是 可以指定希望此群集自动检查更新的频率。 默认值:-s “*/1 * * * *” -h 0(在此示例中,Spark cron 每分钟运行一次,而 HBase cron 不运行) 由于默认情况下未设置 HBase cron,因此在对 HBase 群集执行缩放时需要重新运行此脚本。 如果 HBase 群集经常缩放,可以选择自动设置 HBase cron 作业。 例如:-h "*/30 * * * *" 将脚本配置为每 30 分钟执行一次检查。 这样将会定期运行 HBase cron 计划,以自动将公共存储帐户上的新 HBase 信息下载到本地节点。 手动设置通信(可选,如果上述步骤中提供的脚本失败)注意: 每当其中一个群集经历缩放活动时,都需要执行这些步骤。 将 hbase-site.xml 从本地存储复制到 Spark 群集默认存储所在的根目录。 编辑命令以反映配置。 然后,在与 HBase 群集建立的 SSH 会话中输入该命令: 语法值 新值 URI 方案 修改此值以反映存储。 语法适用于启用了安全传输的 Blob 存储。 SPARK_STORAGE_CONTAINER 替换为 Spark 群集使用的默认存储容器名称。 SPARK_STORAGE_ACCOUNT 替换为 Spark 群集使用的默认存储帐户名称。 hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/然后退出与 HBase 群集的 ssh 连接。 exit使用 SSH 连接到 Spark 集群的头节点。 编辑命令,将 SPARKCLUSTER 替换为 Spark 群集的名称,然后输入该命令: ssh [email protected]输入命令,将 hbase-site.xml 从 Spark 群集的默认存储复制到群集本地存储上的 Spark 2 配置文件夹中: sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf 运行 Spark Shell,引用 Spark HBase 连接器完成上述步骤后,你应该能够运行 Spark shell,并引用相应版本的 Spark HBase 连接器。 例如,下表列出了 HDInsight 团队当前使用的两个版本和相应的命令。 如果 HBase 和 Spark 的版本与表中指示的版本相同,则可以为群集使用相同的版本。 在与 Spark 群集建立的 SSH 会话中,输入以下命令以启动 Spark shell: Spark 版本 HDI HBase 版本 SHC 版本 Command 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/保持此 Spark shell 实例处于打开状态,并继续定义目录和查询。 如果在 SHC Core 存储库中找不到与版本相对应的 jar,请继续阅读。 对于 Spark 和 HBase 版本的后续组合,这些项目不再在上述存储库中发布。 可以直接从 spark-hbase-connector GitHub 分支生成 jar。 例如,如果运行的是 Spark 2.4 和 HBase 2.1,请完成以下步骤: 克隆存储库: git clone https://github.com/hortonworks-spark/shc转到 branch-2.4: git checkout branch-2.4从分支生成(创建 .jar 文件): mvn clean package -DskipTests运行以下命令(请确保更改与所生成的 .jar 文件相对应的 .jar 名称): spark-shell --jars ,/usr/hdp/current/hbase-client/lib/shaded-clients/*保持此 Spark shell 实例处于打开状态,并继续执行下一部分。 定义目录和查询在此步骤中,定义一个将架构从 Apache Spark 映射到 Apache HBase 的目录对象。 在打开的 Spark Shell 中,输入以下 import 语句: import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._输入以下命令,以定义在 HBase 中创建的 Contacts 表的目录: def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin代码: 定义名为 Contacts 的 HBase 表的目录架构。 将 rowkey 标识为 key,并将 Spark 中使用的列名映射到 HBase 中使用的列族、列名和列类型。 将 Rowkey 定义为具有 rowkey 的特定列族 cf 的命名列 (rowkey)。输入命令,以定义一个在 HBase 中提供围绕 Contacts 表的 DataFrame 的方法: def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }创建 DataFrame 的实例: val df = withCatalog(catalog)查询 DataFrame: df.show()应看到如下两行数据: +------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+注册一个临时表,以便使用 Spark SQL 查询 HBase 表: df.createTempView("contacts")针对 contacts 表发出 SQL 查询: spark.sqlContext.sql("select personalName, officeAddress from contacts").show应看到如下结果: +-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+ 插入新数据若要插入新的 Contact 记录,请定义 ContactRecord 类: case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )创建 ContactRecord 的实例并将其放在一个数组中: val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact将新数据数组保存到 HBase: sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()检查结果: df.show()应看到如下输出: +------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+通过输入以下命令关闭 spark shell: :q 后续步骤 Apache Spark HBase 连接器 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |