Flink 状态编程(六)聚合状态(AggregatingState)案例

您所在的位置:网站首页 编程求平均值 Flink 状态编程(六)聚合状态(AggregatingState)案例

Flink 状态编程(六)聚合状态(AggregatingState)案例

2024-07-11 23:18| 来源: 网络整理| 查看: 265

文章目录

我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类似计数窗口(CountWindow)求平均值的计算,这里我们可以使用一个有聚合状态的RichFlatMapFunction 来实现。

代码展示:

public class AverageTimestampExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator stream = env.addSource(new ClickSource()) .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); stream.print("input"); //自定义实现平均时间戳的统计 stream.keyBy(data -> data.user) .flatMap(new AvgTsResult(5L)) .print(); env.execute(); } //自定义实现RichFlatMapFunction public static class AvgTsResult extends RichFlatMapFunction { //自定义的窗口 private Long count; public AvgTsResult(Long count) { this.count = count; } //定义一个聚合状态,用来保存平均时间戳 private AggregatingState avgTsAggState; //定义一个值状态,保存用户访问的次数 private ValueState countState; @Override public void open(Configuration parameters) throws Exception { //获取运行上下文 //ACC : Tuple2 : 第一个所有时间戳的和,第二个是当前的个数 //Out : 平均时间戳 avgTsAggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor( "avg-ts", new AggregateFunction() { @Override public Tuple2 createAccumulator() { return Tuple2.of(0L, 0L); } @Override public Tuple2 add(Event value, Tuple2 accumulator) { return Tuple2.of(accumulator.f0 + value.timestamp, accumulator.f1 + 1); } @Override public Long getResult(Tuple2 accumulator) { return accumulator.f0 / accumulator.f1; } @Override public Tuple2 merge(Tuple2 a, Tuple2 b) { return null; } }, Types.TUPLE(Types.LONG, Types.LONG) )); countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Long.class)); } @Override public void flatMap(Event value, Collector out) throws Exception { //每来一条数据 count 就 +1 Long currCount = countState.value(); //判断,如果状态里面没有值的话,就+1,如果有值的话,就++ if (currCount == null) { currCount = 1L; } else { currCount++; } //更新状态 countState.update(currCount); avgTsAggState.add(value); //根据count,判断是否达到了累积的技术窗口的长度 if (currCount.equals(count)){ out.collect(value.user + "过去"+count+"次访问平均时间戳为:"+ new Timestamp(avgTsAggState.get())); //清空状态 avgTsAggState.clear(); countState.clear(); } } } }

Gitee完整代码展示



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3