数据湖架构Hudi(五)Hudi集成Flink案例详解

您所在的位置:网站首页 plink19 数据湖架构Hudi(五)Hudi集成Flink案例详解

数据湖架构Hudi(五)Hudi集成Flink案例详解

2023-03-14 02:48| 来源: 网络整理| 查看: 265

五、Hudi集成Flink案例详解 5.1 hudi集成flink

flink的下载地址:

https://archive.apache.org/dist/flink/

HudiSupported Flink version0.12.x1.15.x、1.14.x、1.13.x0.11.x1.14.x、1.13.x0.10.x1.13.x0.9.01.12.2 将上述编译好的安装包拷贝到flink下的jars目录中: cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/ 拷贝guava包,解决依赖冲突 cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/ 配置Hadoop环境变量 vim /etc/profile.d/my_env.sh export HADOOP_CLASSPATH=`hadoop classpath` export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop source /etc/profile.d/my_env.sh 5.2 sql-client之yarn-session模式

配置hadoop调度器yarn

mapred-site.xml mapreduce.framework.name yarn yarn.app.mapreduce.am.env HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3 mapreduce.map.env HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3 mapreduce.reduce.env HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3 yarn-site.xml yarn.resourcemanager.hostname centos04 yarn.nodemanager.aux-services mapreduce_shuffle hadoop-env.sh # 在最后面添加如下: export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root # 记得配置sql-client-defaults.yaml 5.2.1 启动 # 1、修改配置文件 vim /opt/apps/flink-1.13.6/conf/flink-conf.yaml classloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4 state.backend: rocksdb execution.checkpointing.interval: 30000 # 开启ck,才能快速从内存中flush出去 state.checkpoints.dir: hdfs://centos04:9000/ckps state.backend.incremental: true # 2、yarn-session模式启动 # 解决依赖问题 cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/ # 启动yarn-session /opt/apps/flink-1.13.6/bin/yarn-session.sh -d # 启动sql-client /opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session 5.2.2 插入数据 set sql-client.execution.result-mode=tableau; -- 创建hudi表 CREATE TABLE t1( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1', 'table.type' = 'MERGE_ON_READ' -- 默认是COW ); 或如下写法 CREATE TABLE t1( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20), PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t1', 'table.type' = 'MERGE_ON_READ' ); -- 插入数据 INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); -- 查询数据 select * from t1; 5.2.3 流式插入 -- 1、创建测试表 CREATE TABLE sourceT ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), `partition` varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); create table t2( uuid varchar(20), name varchar(10), age int, ts timestamp(3), `partition` varchar(20) ) with ( 'connector' = 'hudi', 'path' = '/tmp/hudi_flink/t2', 'table.type' = 'MERGE_ON_READ' ); -- 2、执行插入 insert into t2 select * from sourceT; 查询结果 set sql-client.execution.result-mode=tableau; Flink SQL> select * from t2 limit 10; -- 会产生一个collect的flink任务,拉取10条数据,注意:不是流读取 2023-03-06 22:45:10,403 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false 2023-03-06 22:45:12,897 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at centos04/192.168.42.104:8032 2023-03-06 22:45:12,899 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2023-03-06 22:45:12,918 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface centos04:45452 of application 'application_1678113536312_0001'. +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | op | uuid | name | age | ts | partition | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | +I | d0523c31d3da5b8e2a8ff676dcf... | 327db70824413c5dcde0a7ac10c... | 1971040768 | 2023-03-06 14:40:58.780 | 42b45346672bf719b5393232763... | | +I | cfc07cbebf6890a04942ec88947... | 36fc7a58aab88835f11b3b51a40... | -12199364 | 2023-03-06 14:41:05.781 | e33c02173f4c744fb9c1c68e774... | | +I | 668b204a933494a89b829c76bc6... | aa9ff2109457fdcd5f099b8ce98... | 2061449955 | 2023-03-06 14:41:14.780 | 680514e53b196324423cd12cda5... | | +I | 95fe7878909a801c2726f1d05f5... | 1c86b29fe313e557688df0ba950... | 519997290 | 2023-03-06 14:41:11.781 | b9817c52301ab4614c3053c9ccc... | | +I | 8661c25c8c930f4660fbefa867e... | 01a2bee6b99064c7bca9513ca37... | -682830738 | 2023-03-06 14:41:32.781 | 16ab837502a31e208b06bb74efd... | | +I | 55ce03895e229b29546dbdd2ff3... | 77f2552de13337e8092c1445654... | 2011273584 | 2023-03-06 14:41:09.780 | 3fd688cfa17b2a3a6fd3ffac6bd... | | +I | 50c23f315d736c313b652b34fc5... | 4f9c84ff75466fba8e800daabd0... | -190184764 | 2023-03-06 14:42:26.780 | 7f2a07a1007b2fbfea8cbb2062e... | | +I | 8073e8c70a9bc0e79c2e69aa885... | 30bf89c80d9ab0f0a8f5f883ee6... | -1639873427 | 2023-03-06 14:41:24.781 | 15df7d527d6d7edae496e76d02f... | | +I | 29a61b7cd348d08498d2b089a5d... | 77a63ca7a2e77e6d167de20c673... | 71527378 | 2023-03-06 14:42:14.781 | 2842db44a691f4f1d597ac79086... | | +I | e5defc24191f60557644b7d14e2... | 56bdd04424b8f422d4075ade510... | 1054223989 | 2023-03-06 14:40:42.781 | e8d2d3c6fed90d37b15647d1ecd... | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+

在这里插入图片描述

在这里插入图片描述

5.3 使用IDEA开发

除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。

1、首先,需要将hudi集成flink的jar包,装载到本地的仓库,命令如下:

D:\bigdata\hudi从入门到精通\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom --- [INFO] Installing D:\bigdata\hudi从入门到精通\apps\hudi-flink1.13-bundle-0.12.0.jar to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.jar [INFO] Installing C:\Users\Undo\AppData\Local\Temp\mvninstall50353756903805721.pom to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.pom [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.111 s [INFO] Finished at: 2023-03-02T10:08:15+08:00 [INFO] ------------------------------------------------------------------------

2、导入pom文件

hudi-start com.yyds 1.0-SNAPSHOT 4.0.0 hudi-flink 8 8 1.13.6 0.12.0 1.8 2.12 1.7.30 org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} provided org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} provided org.slf4j slf4j-api ${slf4j.version} provided org.slf4j slf4j-log4j12 ${slf4j.version} provided org.apache.logging.log4j log4j-to-slf4j 2.14.0 provided org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} org.apache.hadoop hadoop-client 3.1.3 provided org.apache.hudi hudi-flink_2.12 ${hudi.version} provided org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.code.findbugs:jsr305 org.slf4j:* log4j:* org.apache.hadoop:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA package com.yyds.hudi.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.concurrent.TimeUnit; public class HudiTest { public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME","root"); // 1、创建flinksql的执行环境 Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081-8089"); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); // 注意:需要设置check-point // 设置状态后端RocksDB EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); env.setStateBackend(embeddedRocksDBStateBackend); // checkpoint配置 env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("hdfs://centos04:9000/ckps"); checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20)); checkpointConfig.setTolerableCheckpointFailureNumber(5); checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1)); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 2、使用flink自带connector模拟数据 tabEnv.executeSql("CREATE TABLE sourceT (\n" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " `partition` varchar(20)\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1'\n" + ")"); // 3、创建hudi表 tabEnv.executeSql("create table t2(\n" + " uuid varchar(20),\n" + " name varchar(10),\n" + " age int,\n" + " ts timestamp(3),\n" + " `partition` varchar(20)\n" + ")\n" + "with (\n" + " 'connector' = 'hudi',\n" + // 指定connector为hudi " 'path' = 'hdfs://192.168.42.104:9000/datas/hudi_warehouse/hudi_flink/t2',\n" + " 'table.type' = 'MERGE_ON_READ'\n" + // MOR类型的表 ")"); // 4、将模拟产生的数据,写入到Hudi表中 tabEnv.executeSql("insert into t2 select * from sourceT"); } }

jar包运行

bin/flink run -t yarn-per-job \ -c com.yyds.hudi.flink.HudiTest \ ./myjars/hudi-flink-1.0-SNAPSHOT.jar

类型映射

Flink SQL TypeHudi TypeAvro logical typeCHAR / VARCHAR / STRINGstringBOOLEANbooleanBINARY / VARBINARYbytesDECIMALfixeddecimalTINYINTintSMALLINTintINTintBIGINTlongFLOATfloatDOUBLEdoubleDATEintdateTIMEinttime-millisTIMESTAMPlongtimestamp-millisARRAYarrayMAP(key must be string/char/varchar type)mapMULTISET(element must be string/char/varchar type)mapROWrecord 5.4 hudi核心参数 5.4.1 去重参数 -- 通过如下语法设置主键: -- 设置单个主键 create table hoodie_table ( f0 int primary key not enforced, f1 varchar(20), ... ) with ( 'connector' = 'hudi', ... ) -- 设置联合主键 create table hoodie_table ( f0 int, f1 varchar(20), ... primary key(f0, f1) not enforced ) with ( 'connector' = 'hudi', ... ) 名称说明默认值备注hoodie.datasource.write.recordkey.field主键字段–支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段precombine.field(0.13.0 之前版本为 write.precombine.field)去重时间字段–record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record 5.4.2 并发参数 名称说明默认值备注write.taskswriter 的并发,每个 writer 顺序写 1~N 个 buckets4增加并发对小文件个数没影响write.bucket_assign.tasksbucket assigner 的并发Flink的并行度增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket) 数write.index_bootstrap.tasksIndex bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数Flink的并行度只在 index.bootstrap.enabled 为 true 时生效read.tasks读算子的并发(batch 和 stream)4compaction.tasksonline compaction 算子的并发writer 的并发online compaction 比较耗费资源,建议走 offline compaction

可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */

案例如下:

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */ select * from sourceT; # 从下图可以看出,writer 的并发变成了2,bucket assigner 的并发变成了3,compaction_task 变成了4

在这里插入图片描述

在这里插入图片描述 可以参考下面Hudi表读取原理,看上图。

5.4.3 压缩参数

​ 在线压缩的参数,通过设置 compaction.async.enabled =false关闭在线压缩执行,但是调度compaction.schedule.enabled 仍然建议开启(即上图的compact_plan_generate步骤),之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan。

名称说明默认值备注compaction.schedule.enabled是否阶段性生成压缩 plantrue建议开启,即使compaction.async.enabled 关闭的情况下compaction.async.enabled是否开启异步压缩true通过关闭此参数关闭在线压缩compaction.tasks压缩 task 并发4compaction.trigger.strategy压缩策略num_commits支持四种策略:num_commits、time_elapsed、num_and_time、num_or_timecompaction.delta_commits默认策略,5 个 commits 压缩一次5compaction.delta_seconds3600compaction.max_memory压缩去重的 hash map 可用内存100(MB)资源够用的话建议调整到 1GBcompaction.target_io每个压缩 plan 的 IO 上限,默认 5GB500(GB)

案例如下:

CREATE TABLE t3( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t3', 'compaction.async.enabled' = 'true', -- 异步在线压缩 'compaction.tasks' = '1', 'compaction.schedule.enabled' = 'true', -- 生成压缩 plan 'compaction.trigger.strategy' = 'num_commits', -- 压缩策略,安装commit次数进行压缩 'compaction.delta_commits' = '2', -- 2次进行压缩 'table.type' = 'MERGE_ON_READ' ); set table.dynamic-table-options.enabled=true; insert into t3 select * from sourceT/*+ OPTIONS('rows-per-second' = '5') */; -- 从hdfs上可以看到,flink发生两次ck,delta_commit提交两次后,将log文件进行压缩,然后生成了parquet文件。

在这里插入图片描述

5.4.4 文件大小

​ Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。

​ 目前只有 log 文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。

名称说明默认值备注hoodie.parquet.max.file.size最大可写入的 parquet 文件大小120 * 1024 * 1024默认 120MB(单位 byte)超过该大小切新的 file grouphoodie.logfile.to.parquet.compression.ratiolog文件大小转 parquet 的比率0.35hoodie 统一依据 parquet 大小来评估小文件策略hoodie.parquet.small.file.limit在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件104857600默认 100MB(单位 byte)大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大hoodie.copyonwrite.record.size.estimate预估的 record 大小,hoodie 会依据历史的 commits 动态估算 record 的大小,但是前提是之前有单次写入超过 hoodie.parquet.small.file.limit 大小,在未达到这个大小时会使用这个参数1024默认 1KB(单位 byte)如果作业流量比较小,可以设置下这个参数hoodie.logfile.max.sizeLogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。1073741824默认1GB(单位 byte)

案例如下:

CREATE TABLE t4( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t4', 'compaction.tasks' = '1', 'hoodie.parquet.max.file.size'= '10000', -- 最大可写入的 parquet 文件大小,设置为10 KB 'hoodie.parquet.small.file.limit'='5000', -- 小文件的大小阈值,小于该参数的文件被认为是小文件 设置为5KB 'table.type' = 'MERGE_ON_READ' ); set table.dynamic-table-options.enabled=true; insert into t4 select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/; 5.4.5 hadoop参数

从 0.12.0 开始支持,如果有跨集群提交执行的需求,可以通过 sql 的 ddl 指定 per-job 级别的 hadoop 配置

名称说明默认值备注hadoop.${you option key}通过 hadoop.前缀指定 hadoop 配置项–支持同时指定多个 hadoop 配置项 5.5 内存优化 5.5.1 内存参数 名称说明默认值备注write.task.max.size一个 write task 的最大可用内存1024当前预留给 write buffer 的内存为write.task.max.size -compaction.max_memory当 write task 的内存 buffer达到阈值后会将内存里最大的 buffer flush 出去write.batch.sizeFlink 的写 task 为了提高写数据效率,会按照写 bucket 提前 buffer 数据,每个 bucket 的数据在内存达到阈值之前会一直 cache 在内存中,当阈值达到会把数据 buffer 传递给 hoodie 的 writer 执行写操作256一般不用设置,保持默认值就好write.log_block.sizehoodie 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式 buffer 在 writer 内部128一般不用设置,保持默认值就好write.merge.max_memoryhoodie 在 COW 写操作的时候,会有增量数据和 base file 数据 merge 的过程,增量的数据会缓存在内存的 map 结构里,这个 map 是可 spill 的,这个参数控制了 map 可以使用的堆内存大小100一般不用设置,保持默认值就好compaction.max_memory同 write.merge.max_memory: 100MB 类似,只是发生在压缩时。100如果是 online compaction,资源充足时可以开大些,比如 1GB 5.5.2 MOR (1)state backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存) (2)内存够的话,compaction.max_memory 调大些 (默认是 100MB 可以调到 1GB) (3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task (比如 BucketAssignFunction 也会吃些内存) (4)需要关注 compaction 的内存变化,compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小,compaction.tasks 控制了 compaction task 的并发 注意: write.task.max.size - compaction.max_memory 是预留给每个 write task 的内存 buffer 5.5.3 COW (1)state backend 换成 rocksdb(默认的 in-memory state-backend 非常吃内存)。 (2)write.task.max.size 和 write.merge.max_memory 同时调大(默认是 1GB 和 100MB 可以调到 2GB 和 1GB)。 (3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task(比如 BucketAssignFunction 也会吃些内存)。 注意:write.task.max.size - write.merge.max_memory 是预留给每个 write task 的内存 buffer。 5.6 读取方式 5.6.1 流读

​ 当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

名称Required默认值说明read.streaming.enabledfalsefalse设置 true 开启流读模式read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(闭区间)read.streaming.skip_compactionfalsefalse流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费) 2) changelog 模式下保证语义正确性 0.11 开始,以上两个问题已经通过保留 compaction 的 instant time 修复clean.retain_commitsfalse10cleaner 最多保留的历史 commits 数,大于此数量的历史 commits 会被清理掉,changelog 模式下,这个参数可以控制 changelog 的保留时间,例如 checkpoint 周期为 5 分钟一次,默认最少保留 50 分钟的时间。 set sql-client.execution.result-mode=tableau; CREATE TABLE t5( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t5', 'table.type' = 'MERGE_ON_READ', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4' -- 默认60s ); insert into t5 select * from sourceT; select * from t5; -- 如下图,能够不断的获取数据

在这里插入图片描述

5.6.2 增量读取 从 0.10.0 开始支持。 如果有增量读取 batch 数据的需求,增量读取包含三种场景。 (1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置; (2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit (3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明) 名称Required默认值说明read.start-commitfalse默认从最新 commit支持 earliest 从最早消费read.end-commitfalse默认到最新 commit 5.6.3 限流

​ 如果将全量数据(百亿数量级) 和增量先同步到 kafka,再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。

名称Required默认值说明write.rate.limitfalse0默认关闭限速 5.7 写入方式 5.7.1 通过flink-cdc进行写入

CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi

在这里插入图片描述

第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。 第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。 注意:如果上游数据无法保证顺序,需要指定 write.precombine.field 字段。

1)准备MySQL表

(1)MySQL开启binlog

(2)建表

create database test; use test; create table stu3 ( id int unsigned auto_increment primary key COMMENT '自增id', name varchar(20) not null comment '学生名字', school varchar(20) not null comment '学校名字', nickname varchar(20) not null comment '学生小名', age int not null comment '学生年龄', class_num int not null comment '班级人数', phone bigint not null comment '电话号码', email varchar(64) comment '家庭网络邮箱', ip varchar(32) comment 'IP地址' ) engine=InnoDB default charset=utf8;

2)flink读取mysql binlog并写入kafka

(1)创建MySQL表

create table stu3_binlog( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'centos01', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'stu3' );

(2)创建Kafka表

create table stu3_binlog_sink_kafka( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) with ( 'connector' = 'upsert-kafka' ,'topic' = 'cdc_mysql_stu3_sink' ,'properties.zookeeper.connect' = 'centos01:2181' ,'properties.bootstrap.servers' = 'centos01:9092' ,'key.format' = 'json' ,'value.format' = 'json' );

(3)将mysql binlog日志写入kafka

insert into stu3_binlog_sink_kafka select * from stu3_binlog;

3)flink读取kafka数据并写入hudi数据湖

(1)创建kafka源表

create table stu3_binlog_source_kafka( id bigint not null, name string, school string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'kafka', 'topic' = 'cdc_mysql_stu3_sink', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'json', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'testGroup' );

(2)创建hudi目标表

create table stu3_binlog_sink_hudi( id bigint not null, name string, `school` string, nickname string, age int not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) partitioned by (`school`) with ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/stu3_binlog_sink_hudi', 'table.type' = 'MERGE_ON_READ', 'write.option' = 'insert', 'write.precombine.field' = 'school' );

(3)将kafka数据写入到hudi中

insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka; 5.7.2 离线批量导入 如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。 (1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。 (2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。 SET execution.runtime-mode = batch; SET execution.checkpointing.interval = 0; (3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。 名称Required默认值说明write.operationTRUEupsert配置 bulk_insert 开启该功能write.tasksFALSE4bulk_insert 写 task 的并发,最后的文件数 >=write.taskswrite.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input(从 0.11 开始)FALSETRUE是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input(从 0.11 开始)FALSETRUE是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量write.sormory128sort 算子的可用 managed memory(单位 MB) 5.7.3 全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

名称Required默认值说明index.bootstrap.enabledtruefalse开启索引加载,会将已存表的最新数据一次性加载到 state 中index.partition.regexfalse*设置正则表达式进行分区筛选,默认为加载全部分区 使用流程 (1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确 (2)设置 index.bootstrap.enabled = true开启索引加载功能 (3)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle 说明: 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度 5.8 写入模式 5.8.1 Changelog 模式

​ 如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

1)WITH 参数

名称Required默认值说明changelog.enabledfalsefalse默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。

​ 批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

​ 开启 changelog.enabled 参数后,中间的变更也只是 Best Effort: 异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的 buffer 时间可以预留一定的时间 buffer 给 reader,比如调整压缩的两个参数:

​ Ø compaction.delta_commits:5

​ Ø compaction.delta_seconds: 3600。

说明:

Changelog 模式开启流读的话,要在 sql-client 里面设置参数:

set sql-client.execution.result-mode=tableau;

或者

set sql-client.execution.result-mode=changelog;

否则中间结果在读的时候会被直接合并。

2)流读 changelog

仅在 0.10.0 支持,本 feature 为实验性。

开启 changelog 模式后,hudi 会保留一段时间的 changelog 供下游 consumer 消费,我们可以通过流读 ODS 层 changelog 接上 ETL 逻辑写入到 DWD 层,如下图的 pipeline: 在这里插入图片描述

​ 流读的时候我们要注意 changelog 有可能会被 compaction 合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3)案例演示

(1)使用changelog

set sql-client.execution.result-mode=tableau; CREATE TABLE t6( id int, ts int, primary key (id) not enforced ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6', 'table.type' = 'MERGE_ON_READ', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4', 'changelog.enabled' = 'true' ); insert into t6 values (1,1); insert into t6 values (1,2); set table.dynamic-table-options.enabled=true; select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATE TABLE t6_v( id int, ts int, primary key (id) not enforced ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://centos04:9000/tmp/hudi_flink/t6', 'table.type' = 'MERGE_ON_READ', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '4' ); select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/; select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/; 5.8.2 Append 模式

从 0.10 开始支持

对于 INSERT 模式:

​ Ø MOR 默认会 apply 小文件策略: 会追加写 avro log 文件

​ Ø COW 每次直接写新的 parquet 文件,没有小文件策略

Hudi 支持丰富的 Clustering 策略,优化 INSERT 模式下的小文件问题:

1)Inline Clustering

只有 Copy On Write 表支持该模式

名称Required默认值说明write.insert.clusterfalsefalse是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响

2) Async Clustering

​ 从 0.12 开始支持

(1)WITH参数

名称Required默认值说明clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan,默认关闭clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commits,clustering.schedule.enabled 为 true 时生效clustering.async.enabledfalsefalse是否异步执行 clustering plan,默认关闭clustering.tasksfalse4Clustering task 执行并发clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小,默认 1GBclustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering,默认600MBclustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段clustering.plan.partition.filter.modefalseNONE支持NONE:不做限制RECENT_DAYS:按时间(天)回溯SELECTED_PARTITIONS:指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天

(2)Clustering Plan Strategy

​ 支持定制化的 clustering 策略。

名称Required默认值说明clustering.plan.partition.filter.modefalseNONE支持· NONE:不做限制· RECENT_DAYS:按时间(天)回溯· SELECTED_PARTITIONS:指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天clustering.plan.strategy.cluster.begin.partitionfalseN/ASELECTED_PARTITIONS 生效,指定开始 partition(inclusive)clustering.plan.strategy.cluster.end.partitionfalseN/ASELECTED_PARTITIONS 生效,指定结束 partition(incluseve)clustering.plan.strategy.partition.regex.patternfalseN/A正则表达式过滤 partitionsclustering.plan.strategy.partition.selectedfalseN/A显示指定目标 partitions,支持逗号 , 分割多个 partition 5.9 Bucket索引

​ 默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket 索引通过固定的 hash 策略,将相同 key 的数据分配到同一个 fileGroup 中,避免了索引的存储和查询开销。

名称Required默认值说明index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数,当前设置后则不可再变更。 (1)bucket index 没有 state 的存储计算开销,性能较好 (2)bucket index 无法扩 buckets,state index 则可以依据文件的大小动态扩容 (3)bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制),state index 没有限制 5.10 Hudi Catalog

​ 从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表,避免重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。

-- DFS 模式 Catalog SQL样例: CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'catalog.path' = '${catalog 的默认路径}', 'mode'='dfs' ); -- Hms 模式 Catalog SQL 样例: CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'catalog.path' = '${catalog 的默认路径}', 'hive.conf.dir' = '${hive-site.xml 所在的目录}', 'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性 ); 名称Required默认值说明catalog.pathtrue–默认的 catalog 根路径,用作表路径的自动推导,默认的表路径: c a t a l o g . p a t h / {catalog.path}/ catalog.path/{db_name}/${table_name}default-databasefalsedefault默认的 database 名hive.conf.dirfalse–hive-site.xml 所在的目录,只在 hms 模式下生效modefalsedfs支持 hms模式通过 hive 管理元数据table.externalfalsefalse是否创建外部表,只在 hms 模式下生效

案例如下:

--(1)创建sql-client初始化sql文件 vim /opt/apps/flink-1.13.6/conf/sql-client-init.sql CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'catalog.path' = '/tmp/hudi_catalog', 'mode'='dfs' ); USE CATALOG hoodie_catalog; --(2)指定sql-client启动时加载sql文件 hadoop fs -mkdir /tmp/hudi_catalog bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session --(3)建库建表插入 create database test; use test; create table t2( uuid varchar(20), name varchar(10), age int, ts timestamp(3), `partition` varchar(20), primary key (uuid) not enforced ) with ( 'connector' = 'hudi', 'path' = '/tmp/hudi_catalog/default/t2', 'table.type' = 'MERGE_ON_READ' ); insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a'); --(4)退出sql-client,重新进入,表信息还在 use test; show tables; select * from t2; 5.11 离线压缩

​ MOR 表的 compaction 默认是自动打开的,策略是 5 个 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

5.11.1 设置参数

Ø compaction.async.enabled 为 false,关闭在线 compaction。

Ø compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan。

5.11.2 原理

一个 compaction 的任务的执行包括两部分:

Ø schedule 压缩 plan

该过程推荐由写任务定时触发,写参数 compaction.schedule.enabled 默认开启

Ø 执行对应的压缩 plan

5.11.3 使用方式

1)执行命令

离线 compaction 需要手动执行 Java 程序,程序入口:

Ø hudi-flink1.13-bundle-0.12.0.jar

Ø org.apache.hudi.sink.compact.HoodieFlinkCompactor

# 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:9000/table

2)参数配置

参数名required默认值备注–pathtrue–目标表的路径–compaction-tasksfalse-1压缩 task 的并发,默认是待压缩 file group 的数量–compaction-max-memoryfalse100 (单位 MB)压缩时 log 数据的索引 map,默认 100MB,内存足够可以开大些–schedulefalsefalse是否要执行 schedule compaction 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 compaction plan 默认是一直 schedule 的,除非手动关闭(默认 5 个 commits 一次压缩)–seqfalseLIFO执行压缩任务的顺序,默认是从最新的压缩 plan 开始执行,可选值:LIFO: 从最新的 plan 开始执行;FIFO: 从最老的 plan 开始执行–servicefalsefalse是否开启 service 模式,service 模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)–min-compaction-interval-secondsfalse600 (单位 秒)service 模式下的执行间隔,默认 10 分钟

案例如下:

create table t7( id int, ts int, primary key (id) not enforced ) with ( 'connector' = 'hudi', 'path' = '/tmp/hudi_catalog/default/t7', 'compaction.async.enabled' = 'false', -- 关闭自动压缩 'compaction.schedule.enabled' = 'true', -- 由写任务阶段性触发压缩 plan 'table.type' = 'MERGE_ON_READ' ); insert into t7 values(1,1); insert into t7 values(2,2); insert into t7 values(3,3); insert into t7 values(4,4); insert into t7 values(5,5); // 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t7 5.12 离线 Clustering

​ 异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

5.12.1 设置参数

Ø clustering.async.enabled 为 false,关闭在线 clustering。

Ø clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。

5.12.2 原理

一个 clustering 的任务的执行包括两部分:

Ø schedule plan

推荐由写任务定时触发,写参数 clustering.schedule.enabled 默认开启。

Ø 执行对应的 plan

5.12.3 使用方式

1)执行命令

离线 clustering 需要手动执行 Java 程序,程序入口:

Ø hudi-flink1.13-bundle-0.12.0.jar

Ø org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob

注意:必须是分区表,否则报错空指针异常。

# 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/table

2)参数配置

参数名required默认值备注–pathtrue–目标表的路径。–clustering-tasksfalse-1Clustering task 的并发,默认是待压缩 file group 的数量。–schedulefalsefalse是否要执行 schedule clustering plan 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 clustering plan 默认是一直 schedule 的,除非手动关闭(默认 4 个 commits 一次 clustering)。–seqfalseFIFO执行压缩任务的顺序,默认是从最老的 clustering plan 开始执行,可选值:LIFO: 从最新的 plan 开始执行;FIFO: 从最老的 plan 开始执行–target-file-max-bytesfalse1024 * 1024 * 1024最大目标文件,默认 1GB。–small-file-limitfalse600小于该大小的文件会参与 clustering,默认 600MB。–sort-columnsfalseN/AClustering 可选排序列。–servicefalsefalse是否开启 service 模式,service 模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)。–min-compaction-interval-secondsfalse600 (单位 秒)service 模式下的执行间隔,默认 10 分钟。

3)案例演示

create table t8( id int, age int, ts int, primary key (id) not enforced ) partitioned by (age) with ( 'connector' = 'hudi', 'path' = '/tmp/hudi_catalog/default/t8', 'clustering.async.enabled' = 'false', 'clustering.schedule.enabled' = 'true', 'table.type' = 'COPY_ON_WRITE' ); insert into t8 values(1,18,1); insert into t8 values(2,18,2); insert into t8 values(3,18,3); insert into t8 values(4,18,4); insert into t8 values(5,18,5); -- 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t8 5.12.4 常见问题 # 存储一直看不到数据 如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略: 当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB) 当总的 buffer 大小积攒到一定大小(可配,默认 1GB) 当 checkpoint 触发,将内存里的数据全部 flush 出去 # 数据有重复 如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。) 如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认true。) 索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。) # Merge On Read 写只有 log 文件 Merge On Read 默认开启了异步的 compaction,策略是 5 个 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。 5.13 Hudi核心原理 5.13.1 Hudi数据去重原理

Hoodie 的数据去重分两步:

(1)写入前攒 buffer 阶段去重,核心接口HoodieRecordPayload#preCombine

(2)写入过程中去重,核心接口HoodieRecordPayload#combineAndGetUpdateValue。

1)消息版本新旧

​ 相同 record key (主键)的数据通过write.precombine.field指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新,如果是相等的 precombine 字段,则后来的数据更新。

​ 从 0.10 版本开始,write.precombine.field 字段为可选,如果没有指定,会看 schema 中是否有 ts 字段,如果有,ts 字段被选为 precombine 字段;如果没有指定,schema 中也没有 ts 字段,则为处理顺序:后来的消息默认较新。

2)攒消息阶段的去重

​ Hoodie 将 buffer 消息发给 write handle 之前可以执行一次去重操作,通过HoodieRecordPayload#preCombine 接口,保留 precombine 字段较大的消息,此操作为纯内存的计算,在同一个 write task 中为单并发执行。

​ 注意:write.precombine 选项控制了攒消息的去重。

3)写 parquet 增量消息的去重

​ 在Hoodie 写入流程中,Hoodie 每写一个 parquet 都会有 base + 增量 merge 的过程,增量的消息会先放到一个 spillable map 的数据结构构建内存 index,这里的增量数据如果没有提前去重,那么同 key 的后来消息会直接覆盖先来的消息。

​ Writer 接着扫 base 文件,过程中会不断查看内存 index 是否有同 key 的新消息,如果有,会走 HoodieRecordPayload#combineAndGetUpdateValue 接口判断保留哪个消息。

​ 注意: MOR 表的 compaction 阶段和 COW 表的写入流程都会有 parquet 增量消息去重的逻辑。

4)跨 partition 的消息去重

​ 默认情况下,不同的 partition 的消息是不去重的,即相同的 key 消息,如果新消息换了 partition,那么老的 partiiton 消息仍然保留。

​ 开启 index.global.enabled 选项开启跨 partition 去重,原理是先往老的 partiton 发一条删除消息,再写新 partition。

5.13.2 Hudi表写入原理

数据写入、数据压缩与数据清理

1)数据写入分析 (1)基础数据封装:将数据流中flink的RowData封装成Hoodie实体; (2)BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:若为插入操作,则取大小最小的FileGroup对应的FileId文件内进行插入;在此文件的后续写入中文件 ID 保持不变,并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本,给定其分区路径,都可以使用文件 ID 和 instantTime进行唯一定位;若为更新操作,则直接在当前location进行数据更新; (3)Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中; (4)Oprator Coordinator:主要与Hoodie Stream Writer进行交互,处理checkpoint等事件,在做checkpoint时,提交instant到timeLine上,并生成下一个instant的时间,算法为取当前最新的commi时间,比对当前时间与commit时间,若当前时间大于commit时间,则返回,否则一直循环等待生成。 2)数据压缩 压缩(compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance: 具体策略分为4种,具体见官网说明: compaction.trigger.strategy: Strategy to trigger compaction, options are 1.'num_commits': trigger compaction when reach N delta commits; 2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits' Default Value: num_commits (Optional) 在项目实践中需要注意参数'read.streaming.skip_compaction' 参数的配置,其表示在流式读取该表是否跳过压缩后的数据,若该表用于后续聚合操作表的输入表,则需要配置值为true,表示聚合操作表不再消费读取压缩数据。若不配置或配置为false,则该表中的数据在未被压缩之前被聚合操作表读取了一次,在压缩后数据又被读取一次,会导致聚合表的sum、count等算子结果出现双倍情况。 3)数据清理 随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE)或将这些增量更新写入日志文件以避免重写更新版本的数据文件(MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。 5.13.3 Hudi表读取原理

(1)开启split_monitor算子,每隔N秒(可配置)监听TimeLine上变化,并将变更的Instance封装为FileSlice。

(2)分发log文件时候,按照fileId值进行keyBy,保证同一file group下数据文件都给一个Task进行处理,从而保证数据处理的有序性。

(3)split_reader根据FileSlice信息进行数据读取。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3