高性能Flink SQL优化技巧

您所在的位置:网站首页 amda84555m性能 高性能Flink SQL优化技巧

高性能Flink SQL优化技巧

#高性能Flink SQL优化技巧| 来源: 网络整理| 查看: 265

开启MiniBatch(提升吞吐)

MiniBatch是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐并减少数据的输出量。

MiniBatch主要基于事件消息来触发微批处理,事件消息会按您指定的时间间隔在源头插入。

适用场景

微批处理通过增加延迟换取高吞吐,如果您有超低延迟的要求,不建议开启微批处理。通常对于聚合场景,微批处理可以显著地提升系统性能,建议开启。

开启方式 MiniBatch默认关闭,您需要在作业 高级配置中的 更多Flink配置填写以下代码。 table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s参数解释如下表所示。 参数 说明 table.exec.mini-batch.enabled 是否开启mini-batch。 table.exec.mini-batch.allow-latency 批量输出数据的时间间隔。 开启LocalGlobal(解决常见数据热点问题)

LocalGlobal本质上能够靠LocalAgg的聚合筛除部分倾斜数据,从而降低GlobalAgg的热点,提升性能。

LocalGlobal优化将原先的Aggregate分成Local和Global两阶段聚合,即MapReduce模型中的Combine和Reduce两阶段处理模式。第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批的增量值(Accumulator)。第二阶段再将收到的Accumulator合并(Merge),得到最终的结果(GlobalAgg)。

适用场景

提升普通聚合(例如SUM、COUNT、MAX、MIN和AVG)的性能,以及这些场景下的数据热点问题。

使用限制 LocalGlobal是默认开启的,但是有以下限制: 在minibatch开启的前提下才能生效。 需要使用AggregateFunction实现Merge。 判断是否生效

观察最终生成的拓扑图的节点名字中是否包含GlobalGroupAggregate或LocalGroupAggregate。

开启PartialFinal(解决COUNT DISTINCT热点问题)

为了解决COUNT DISTINCT的热点问题,通常需要手动改写为两层聚合(增加按Distinct Key取模的打散层)。目前,实时计算提供了COUNT DISTINCT自动打散,即PartialFinal优化,您无需自行改写为两层聚合。

LocalGlobal优化针对普通聚合(例如SUM、COUNT、MAX、MIN和AVG)有较好的效果,对于COUNT DISTINCT收效不明显,因为COUNT DISTINCT在Local聚合时,对于DISTINCT KEY的去重率不高,导致在Global节点仍然存在热点问题。

适用场景 使用COUNT DISTINCT,但无法满足聚合节点性能要求。 说明 不能在包含UDAF的Flink SQL中使用PartialFinal优化方法。 数据量较少的情况,不建议使用PartialFinal优化方法,浪费资源。因为PartialFinal优化会自动打散成两层聚合,引入额外的网络Shuffle。 开启方式 默认不开启。如果您需要开启,则需要在作业 高级配置中的 更多Flink配置填写以下代码。 table.optimizer.distinct-agg.split.enabled: true 判断是否生效

观察最终生成的拓扑图,是否由原来一层的聚合变成了两层的聚合。

AGG WITH CASE WHEN改写为AGG WITH FILTER语法(提升大量COUNT DISTINCT场景性能) 统计作业需要计算各种维度的UV,例如全网UV、来自手机客户端的UV、来自PC的UV等等。建议使用标准的AGG WITH FILTER语法来代替CASE WHEN实现多维度统计的功能。实时计算目前的SQL优化器能分析出Filter参数,从而同一个字段上计算不同条件下的COUNT DISTINCT能共享State,减少对State的读写操作。性能测试中,使用AGG WITH FILTER语法来代替CASE WHEN能够使性能提升1倍。 适用场景

对于同一个字段上计算不同条件下的COUNT DISTINCT结果的场景,性能提升很大。

原始写法 COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2 优化写法 COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3