Flink Core API写入和消费Hudi,流批一体落地实践

您所在的位置:网站首页 flink写数据到hive Flink Core API写入和消费Hudi,流批一体落地实践

Flink Core API写入和消费Hudi,流批一体落地实践

2023-07-12 16:24| 来源: 网络整理| 查看: 265

我们之前介绍了使用Flink CDC将数据实时写入Hudi,以实现对业务库的实时同步。近期,我们对公司的数据流转工具DataX和Sqoop进行了替换,将原先的MySQL到Hive的数据流转切换到了Hudi上。这一优化方案在减轻对业务库压力和提升实时性方面都取得了预期的效果。 (进flink群 +v:zoomake1024)

在进一步优化这个流程时,我们采取了以下措施:

① 通过加载Hudi的--source-avro-schema-path来获取字段及其类型,避免在代码中硬编码。这样做可以使字段及其类型的定义更加灵活,方便进行调整和变更。

② 当业务库增加、删除或修改字段时,我们通过广播流的方式,在不重启Job的情况下实现动态调整Table Schema。这使得我们能够在流程运行过程中灵活应对业务库结构的变化,而无需中断或重启整个任务。

③ 在读取datetime类型列时,我们解决了时区问题,以确保数据的一致性。时区问题可能导致数据在不同系统之间的不一致性,我们针对这个问题进行了处理,保证了准确的时间处理和数据同步。

随着这一流程的稳定,对实时性要求不是很高的流处理(分钟级别)进行了迁移,用Hudi替代Kafka,全链路流批一体。因为通过Flink写入Hudi的底层使用的CheckPoint,默认情况下5个checkpoint触发一次commit,所以实时性要求很高的流处理任务暂时不建议迁移,经过测试,分钟级别的任务是完全没有问题的。也许随着Hudi的发展,其写入和消费的实时性也会进一步提升,替代更多实时任务也会成为可能。

消费Hudi的实现参考Hudi源码中的HoodieTableSource,HoodieSourceFactory代码与HoodieTableFactory代码完全一致,之后会进行优化,删除冗余代码等,HoodieSource的核心方法如下:

public DataStream produceDataStream(StreamExecutionEnvironment execEnv) { String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]); DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]); LogicalType[] logicalType = TypeConversions.fromDataToLogicalType(schemaTypes); TypeInformation typeInfo = InternalTypeInfo.ofFields(logicalType,schemaFieldNames); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource(source); } else { InputFormatSourceFunction func = new InputFormatSourceFunction(getInputFormat(), typeInfo); DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); } }

通过READ_AS_STREAMING可以控制是流处理(常驻内存),还是批处理(消费完Hudi数据后结束Job)。

Job main方法中的代码如下:

String hudiTablePath = "hdfs://......"; Configuration conf = new Configuration(); conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); conf.setString(FlinkOptions.PATH, hudiTablePath); conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, "hdfs://......"); conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); conf.setString(FlinkOptions.TABLE_NAME, "tableName"); conf.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT); conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST); conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.enableCheckpointing(3000); ResolvedSchema schema = SchemaBuilder.instance() .field() .field() ...... .primaryKey() .build(); StreamerUtil.initTableIfNotExists(conf); MockContext sourceContext = MockContext.getInstance(conf, schema, "partition"); HoodieSource hoodieSource = (HoodieSource) new HoodieSourceFactory().createDynamicTableSource(sourceContext); SingleOutputStreamOperator result = tableSource.produceDataStream(env) .map(new MapFunction() { @Override public Tuple2 map(RowData value) throws Exception { ...... return new Tuple2(); } });

通过以上架构升级,再结合上一篇介绍的写入Hudi的代码,我们成功实现了Hudi --> Flink --> Hudi的全链路流批一体化。

这次架构升级对某一业务流程产生了以下影响:

① 在过去的流程中,相同的数据流需要经历批处理和流处理两次处理,而现在只需进行一次处理。此外,离线链路的一部分流程可以提前处理,从而将耗时从5小时缩短到了3小时。

② 由于Hudi支持数据的修改,数据回溯模式得到极大简化。数据回溯周期也大幅度缩短,回溯30天数据的时间从过去的12天缩短到了5天。

通过这次架构升级,我们显著提升了流程的效率和数据回溯的速度,为业务流程带来了实质性的改进。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3