Hudi:初识Hudi

您所在的位置:网站首页 数仓和数据湖 Hudi:初识Hudi

Hudi:初识Hudi

2023-09-16 06:33| 来源: 网络整理| 查看: 265

是什么?

Hudi是什么?可以说Hudi是一个数据湖或是数据库,但它又不是数据湖或是数据库。笔者理解为Hudi是除开计算引擎的Hive。

众所周知,Hive是一个计算框架,但是现在我们更多的是使用Spark基于Hive对HDFS中文件提供的Schema信息和元数据进行计算,而Hive作为计算引擎的功能逐渐被忽略,更多的是将Hive视作一个“数据库”(尽管它并不是),而Hudi则是完善了Hive的这部分功能,甚至可以提供近实时的数据抽取与查询。

使用Hudi对HDFS或是其他存储系统中的文件进行管理,使用Hudi创建相应的表,一样可以使用Hive或是Spark对这些表进行计算。但是却解决了Hadoop一直臭名昭著的小文件问题,查询缓慢问题等。

Hudi具有以下特性:

快速upsert,可插入索引

以原子方式操作数据并具有回滚功能

写入器之间的快照隔离

savepoint用户数据恢复的保存点

管理文件大小,使用统计数据布局

数据行的异步压缩和柱状数据

时间轴数据跟踪血统

为什么? 为什么需要Hudi? 1.Hudi的高效率

Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。自2016年8月以来已投入生产,为庞大的100PB数据湖提供了支持,其中包括对业务至关重要的表,如核心旅行,搭便车,合作伙伴。它还为多个递增的Hive ETL管道提供支持,目前已集成到Uber的数据分散系统中。使用Hudi做实时数仓是一个很好的选择,实际上阿里和顺丰也这么做了。Hudi的高效体现在数据抽取与分析上。

近实时的数据抽取:

将数据从外部系统(如数据库、日志文件、消息队列等)导入到Hadoop中,是一个众所周知的问题。大多情况下,一个数仓中不得不使用多个工具(如sqoop、canal、flume等)对不同数据源进行数据抽取。

对于传统数据库,Hudi通过Upserts(增量更新)以提高加载速度。与Hudi相比,Canal读取Mysql Binlog日志以及Sqoop增量导入这些方法就显得消耗性能却效率低下。

对于Cassandra / Voldemort / HBase这类nosql性数据库,动辄数十亿行数据量,一批次全量导入是完全不可行的。Hudi对此也提高了更高效的方法。

即使对于Kafka这样不可变的数据源,Hudi仍可帮助HDFS强制使用最小文件大小,从而通过解决Hadoop生态圈中一个诟病已久的小文件问题,以改善NameNode的运行状况。

无论对接何种数据源,Hudi都提供了数据提交的原子性,保证消费者不会受到抽取数据失败导致的影响。

近实时的数据分析:

通常情况下,为了提高访问效率会使用Kylin、Impala、Presto等即席查询工具,将Hive上的数据在Hbase上存储一份,然后通过对Hbase查询以实现高效查询。但是如果使用Hudi,则直接就可以很快的进行查询,而不必多一块开销去运行与存储Hbase。

2.Hudi可以避免小文件问题

通常情况下,Hive或Spark计算时会生成大量小文件,然后再通过一些手段将它们合并在一起,这样只能解决由小文件引起的系统可伸缩性问题,但是无法解决未合并前,对小文件进行查询时效率低下的问题。而在Hudi中,一个关键的设计是避免创建小文件,并且总是生成大小合适的文件。Hudi在 ingest/writing 上花费更多的时间,以保持查询时始终高效。与常规解决方法不同,Hudi直接在生成端避免小文件问题,使小文件无法暴露给计算引擎,也就解决了小文件的低效查询问题。

怎么做? Hudi安装

hudi安装需要通过Maven对从Git下载下来的源码进行编译。

安装maven:

1.把apache-maven-3.6.1-bin.tar.gz上传到linux的/opt/software目录下

2.解压apache-maven-3.6.1-bin.tar.gz到/opt/module/目录下面

[root@hd01 software]$ tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/

3.修改apache-maven-3.6.1的名称为maven

mv apache-maven-3.6.1/ maven

4.添加环境变量到/etc/profile中

[root@hd01 module]$ sudo vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin

5.测试安装结果

[root@hd01 module]$ source /etc/profile [root@hd01 module]$ mvn -v

6.修改setting.xml,指定为阿里云

[root@hd01 maven]$ cd conf [root@hd01 maven]$ vim settings.xml nexus-aliyun central Nexus aliyun http://maven.aliyun.com/nexus/content/groups/public 安装Git sudo yum install git 构建Hudi [root@hd01 software]$ cd /opt/module/ [root@hd01 module]$ git clone https://github.com/apache/hudi.git && cd hudi [root@hd01 hudi]$ vim pom.xml nexus-aliyun nexus-aliyun http://maven.aliyun.com/nexus/content/groups/public/ true false [root@hd01 hudi]$ mvn clean package -DskipTests -DskipITs 使用Spark-shell对接Hudi 启动spark-shell

spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是2.4.5。

[root@hd01 hudi]# spark-shell \ --packages org.apache.spark:spark-avro_2.11:2.4.5 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --jars /opt/module/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.0-SNAPSHOT.jar 设置表名 scala> import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala> import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala> import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala> import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala> val tableName = "hudi_trips_cow" tableName: String = hudi_trips_cow scala> val basePath = "file:///tmp/hudi_trips_cow" basePath: String = file:///tmp/hudi_trips_cow scala> val dataGen = new DataGenerator dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@5cdd5ff9 插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表

scala> val inserts = convertToStringList(dataGen.generateInserts(10)) scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Overwrite). | save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

[root@hd01 ~]# cd /tmp/hudi_trips_cow/ [root@hd01 hudi_trips_cow]# ls americas asia 查询数据 scala> val tripsSnapshotDF = spark. | read. | format("hudi"). | load(basePath + "/*/*/*/*") scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() +------------------+-------------------+-------------------+---+ | fare| begin_lon| begin_lat| ts| +------------------+-------------------+-------------------+---+ | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| | 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| |34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +------------------+-------------------+-------------------+---+ scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() +-------------------+--------------------+----------------------+---------+----------+------------------+ |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| +-------------------+--------------------+----------------------+---------+----------+------------------+ | 20200701105144|6007a624-d942-4e0...| americas/united_s...|rider-213|driver-213| 64.27696295884016| | 20200701105144|db7c6361-3f05-48d...| americas/united_s...|rider-213|driver-213| 33.92216483948643| | 20200701105144|dfd0e7d9-f10c-468...| americas/united_s...|rider-213|driver-213|19.179139106643607| | 20200701105144|e36365c8-5b3a-415...| americas/united_s...|rider-213|driver-213| 27.79478688582596| | 20200701105144|fb92c00e-dea2-48e...| americas/united_s...|rider-213|driver-213| 93.56018115236618| | 20200701105144|98be3080-a058-47d...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014| | 20200701105144|3dd6ef72-4196-469...| americas/brazil/s...|rider-213|driver-213| 66.62084366450246| | 20200701105144|20f9463f-1c14-4e6...| americas/brazil/s...|rider-213|driver-213|34.158284716382845| | 20200701105144|1585ad3a-11c9-43c...| asia/india/chennai|rider-213|driver-213|17.851135255091155| | 20200701105144|d40daa90-cf1a-4d1...| asia/india/chennai|rider-213|driver-213| 41.06290929046368| +-------------------+--------------------+----------------------+---------+----------+------------------+

由于测试数据分区是 区域/国家/城市,所以load(basePath  “/*/*/*/*”)

增量查询

Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。

scala> spark. | read. | format("hudi"). | load(basePath + "/*/*/*/*"). | createOrReplaceTempView("hudi_trips_snapshot") scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) scala> val beginTime = commits(commits.length - 2) beginTime: String = 20200701105144 scala> val tripsIncrementalDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | load(basePath) scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ | 20200701110546|49.527694252432056| 0.5142184937933181| 0.7340133901254792|0.0| | 20200701110546| 90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0| | 20200701110546| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|0.0| | 20200701110546| 90.25710109008239| 0.4006983139989222|0.08528650347654165|0.0| | 20200701110546| 63.72504913279929| 0.888493603696927| 0.6570857443423376|0.0| | 20200701110546| 29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0| +-------------------+------------------+--------------------+-------------------+---+

这将提供在beginTime提交后的数据,并且fare>20的数据

时间点查询

根据特定时间查询,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)

scala> val beginTime = "000" beginTime: String = 000 scala> val endTime = commits(commits.length - 2) endTime: String = 20200701105144 scala> val tripsPointInTimeDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | option(END_INSTANTTIME_OPT_KEY, endTime). | load(basePath) scala> tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+-------------------+---+ | 20200701105144| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 20200701105144| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 20200701105144| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 20200701105144| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 20200701105144| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| | 20200701105144| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| | 20200701105144|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 20200701105144| 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +-------------------+------------------+-------------------+-------------------+---+ 删除数据

只有append模式,才支持删除功能

scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count() res12: Long = 10 scala> val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2) scala> val deletes = dataGen.generateDeletes(ds.collectAsList()) scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)); scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(OPERATION_OPT_KEY,"delete"). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath) scala> val roAfterDeleteViewDF = spark. | read. | format("hudi"). | load(basePath + "/*/*/*/*") scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count() res15: Long = 8

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3