窗口

您所在的位置:网站首页 窗口的使用 窗口

窗口

2024-07-14 19:07| 来源: 网络整理| 查看: 265

窗口 #

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。 我们可以看到,这两者唯一的区别仅在于:keyed streams 要调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)。留意这个区别,它能帮我们更好地理解后面的内容。

Keyed Windows

stream .keyBy(...) None: """ Deletes any state in the :class:`Context` when the Window expires (the watermark passes its max_timestamp + allowed_lateness). :param context: The context to which the window is being evaluated. """ pass class Context(ABC, Generic[W2]): """ The context holding window metadata. """ @abstractmethod def window(self) -> W2: """ :return: The window that is being evaluated. """ pass @abstractmethod def current_processing_time(self) -> int: """ :return: The current processing time. """ pass @abstractmethod def current_watermark(self) -> int: """ :return: The current event-time watermark. """ pass @abstractmethod def window_state(self) -> KeyedStateStore: """ State accessor for per-key and per-window state. .. note:: If you use per-window state you have to ensure that you clean it up by implementing :func:`~ProcessWindowFunction.clear`. :return: The :class:`KeyedStateStore` used to access per-key and per-window states. """ pass @abstractmethod def global_state(self) -> KeyedStateStore: """ State accessor for per-key global state. """ pass

key 参数由 keyBy() 中指定的 KeySelector 选出。 如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。

ProcessWindowFunction 可以像下面这样定义:

DataStream input = ...; input .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new MyProcessWindowFunction()); /* ... */ public class MyProcessWindowFunction extends ProcessWindowFunction { @Override public void process(String key, Context context, Iterable input, Collector out) { long count = 0; for (Tuple2 in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); } } val input: DataStream[(String, Long)] = ... input .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new MyProcessWindowFunction()) /* ... */ class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = { var count = 0L for (in Iterable[str]: count = 0 for _ in elements: count += 1 yield "Window: {} count: {}".format(context.window(), count)

上例使用 ProcessWindowFunction 对窗口中的元素计数,并且将窗口本身的信息一同输出。

注意,使用 ProcessWindowFunction 完成简单的聚合任务是非常低效的。 下一章会说明如何将 ReduceFunction 或 AggregateFunction 与 ProcessWindowFunction 组合成既能 增量聚合又能获得窗口额外信息的窗口函数。 增量聚合的 ProcessWindowFunction #

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。

你也可以对过时的 WindowFunction 使用增量聚合。

使用 ReduceFunction 增量聚合 #

下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

DataStream input = ...; input .keyBy() .window() .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions private static class MyReduceFunction implements ReduceFunction { public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction { public void process(String key, Context context, Iterable minReadings, Collector out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2(context.window().getStart(), min)); } } val input: DataStream[SensorReading] = ... input .keyBy() .window() .reduce( (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, ( key: String, context: ProcessWindowFunction[_, _, _, TimeWindow]#Context, minReadings: Iterable[SensorReading], out: Collector[(Long, SensorReading)] ) => { val min = minReadings.iterator.next() out.collect((context.window.getStart, min)) } ) input = ... # type: DataStream input \ .key_by() \ .window() \ .reduce(lambda r1, r2: r2 if r1.value > r2.value else r1, window_function=MyProcessWindowFunction(), output_type=Types.TUPLE([Types.STRING(), Types.LONG()])) # Function definition class MyProcessWindowFunction(ProcessWindowFunction): def process(self, key: str, context: ProcessWindowFunction.Context, min_readings: Iterable[SensorReading]) -> Iterable[Tuple[int, SensorReading]]: min = next(iter(min_readings)) yield context.window().start, min 使用 AggregateFunction 增量聚合 #

下例展示了如何将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream input = ...; input .keyBy() .window() .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions /** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction { @Override public Tuple2 createAccumulator() { return new Tuple2(0L, 0L); } @Override public Tuple2 add(Tuple2 value, Tuple2 accumulator) { return new Tuple2(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2 accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2 merge(Tuple2 a, Tuple2 b) { return new Tuple2(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction { public void process(String key, Context context, Iterable averages, Collector out) { Double average = averages.iterator().next(); out.collect(new Tuple2(key, average)); } } val input: DataStream[(String, Long)] = ... input .keyBy() .window() .aggregate(new AverageAggregate(), new MyProcessWindowFunction()) // Function definitions /** * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator._1 + value._2, accumulator._2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2) } class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] { def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = { val average = averages.iterator.next() out.collect((key, average)) } } input = ... # type: DataStream input .key_by() \ .window() \ .aggregate(AverageAggregate(), window_function=MyProcessWindowFunction(), accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]), output_type=Types.TUPLE([Types.STRING(), Types.DOUBLE()])) # Function definitions class AverageAggregate(AggregateFunction): """ The accumulator is used to keep a running sum and a count. The :func:`get_result` method computes the average. """ def create_accumulator(self) -> Tuple[int, int]: return 0, 0 def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) -> Tuple[int, int]: return accumulator[0] + value[1], accumulator[1] + 1 def get_result(self, accumulator: Tuple[int, int]) -> float: return accumulator[0] / accumulator[1] def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]: return a[0] + b[0], a[1] + b[1] class MyProcessWindowFunction(ProcessWindowFunction): def process(self, key: str, context: ProcessWindowFunction.Context, averages: Iterable[float]) -> Iterable[Tuple[str, float]]: average = next(iter(averages)) yield key, average 在 ProcessWindowFunction 中使用 per-window state #

除了访问 keyed state (任何富函数都可以),ProcessWindowFunction 还可以使用作用域仅为 “当前正在处理的窗口”的 keyed state。在这种情况下,理解 per-window 中的 window 指的是什么非常重要。 总共有以下几种窗口的理解:

在窗口操作中定义的窗口:比如定义了长一小时的滚动窗口或长两小时、滑动一小时的滑动窗口。 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口。 具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。

Per-window state 作用于后者。也就是说,如果我们处理有 1000 种不同 key 的事件, 并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么我们将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。

process() 接收到的 Context 对象中有两个方法允许我们访问以下两种 state:

globalState(),访问全局的 keyed state windowState(), 访问作用域仅限于当前窗口的 keyed state

如果你可能将一个 window 触发多次(比如当你的迟到数据会再次触发窗口计算, 或你自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用。 这时你可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。

当使用窗口状态时,一定记得在删除窗口时清除这些状态。他们应该定义在 clear() 方法中。

WindowFunction(已过时) #

在某些可以使用 ProcessWindowFunction 的地方,你也可以使用 WindowFunction。 它是旧版的 ProcessWindowFunction,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。 这个接口会在未来被弃用。

WindowFunction 的签名如下:

public interface WindowFunction extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ void apply(KEY key, W window, Iterable input, Collector out) throws Exception; } trait WindowFunction[IN, OUT, KEY, W Iterable[OUT]: """ Evaluates the window and outputs none or several elements. :param key: The key for which this window is evaluated. :param window: The window that is being evaluated. :param inputs: The elements in the window being evaluated. """ pass

它可以像下例这样使用:

DataStream input = ...; input .keyBy() .window() .apply(new MyWindowFunction()); val input: DataStream[(String, Long)] = ... input .keyBy() .window() .apply(new MyWindowFunction()) input = ... # type: DataStream input \ .key_by() \ .window() \ .apply(MyWindowFunction()) Triggers #

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(...) 调用中指定自定义的 trigger。

Trigger 接口提供了五个方法来响应不同的事件:

onElement() 方法在每个元素被加入窗口时调用。 onEventTime() 方法在注册的 event-time timer 触发时调用。 onProcessingTime() 方法在注册的 processing-time timer 触发时调用。 onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。 最后,clear() 方法处理在对应窗口被移除时所需的逻辑。

有两点需要注意:

前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种: CONTINUE: 什么也不做 FIRE: 触发计算 PURGE: 清空窗口内的元素 FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素 上面的任意方法都可以用来注册 processing-time 或 event-time timer。 触发(Fire)与清除(Purge) #

当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE 或 FIRE_AND_PURGE。 这是让窗口算子发送当前窗口计算结果的信号。 如果一个窗口指定了 ProcessWindowFunction,所有的元素都会传给 ProcessWindowFunction。 如果是 ReduceFunction 或 AggregateFunction,则直接发送聚合的结果。

当 trigger 触发时,它可以返回 FIRE 或 FIRE_AND_PURGE。 FIRE 会保留被触发的窗口中的内容,而 FIRE_AND_PURGE 会删除这些内容。 Flink 内置的 trigger 默认使用 FIRE,不会清除窗口的状态。

Purge 只会移除窗口的内容, 不会移除关于窗口的 meta-information 和 trigger 的状态。 WindowAssigner 默认的 Triggers #

WindowAssigner 默认的 Trigger 足以应付诸多情况。 比如说,所有的 event-time window assigner 都默认使用 EventTimeTrigger。 这个 trigger 会在 watermark 越过窗口结束时间后直接触发。

GlobalWindow 的默认 trigger 是永远不会触发的 NeverTrigger。因此,使用 GlobalWindow 时,你必须自己定义一个 trigger。

当你在 trigger() 中指定了一个 trigger 时, 你实际上覆盖了当前 WindowAssigner 默认的 trigger。 比如说,如果你指定了一个 CountTrigger 给 TumblingEventTimeWindows,你的窗口将不再根据时间触发, 而是根据元素数量触发。如果你希望即响应时间,又响应数量,就需要自定义 trigger 了。 内置 Triggers 和自定义 Triggers #

Flink 包含一些内置 trigger。

之前提到过的 EventTimeTrigger 根据 watermark 测量的 event time 触发。 ProcessingTimeTrigger 根据 processing time 触发。 CountTrigger 在窗口中的元素超过预设的限制时触发。 PurgingTrigger 接收另一个 trigger 并将它转换成一个会清理数据的 trigger。

如果你需要实现自定义的 trigger,你应该看看这个抽象类 Trigger 。 请注意,这个 API 仍在发展,所以在之后的 Flink 版本中可能会发生变化。

Evictors #

Flink 的窗口模型允许在 WindowAssigner 和 Trigger 之外指定可选的 Evictor。 如本文开篇的代码中所示,通过 evictor(...) 方法传入 Evictor。 Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。 Evictor 接口提供了两个方法实现此功能:

/** * Optionally evicts elements. Called before windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} * @param evictorContext The context for the Evictor */ void evictBefore(Iterable elements, int size, W window, EvictorContext evictorContext); /** * Optionally evicts elements. Called after windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} * @param evictorContext The context for the Evictor */ void evictAfter(Iterable elements, int size, W window, EvictorContext evictorContext);

evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。

Flink 内置有三个 evictor:

CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除 DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。 TimeEvictor: 接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。

默认情况下,所有内置的 evictor 逻辑都在调用窗口函数前执行。

指定一个 evictor 可以避免预聚合,因为窗口中的所有元素在计算前都必须经过 evictor。 Note: Evictor 在 Python DataStream API 中还不支持.

Flink 不对窗口中元素的顺序做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。

Allowed Lateness #

在使用 event-time 窗口时,数据可能会迟到,即 Flink 用来追踪 event-time 进展的 watermark 已经 越过了窗口结束的 timestamp 后,数据才到达。对于 Flink 如何处理 event time, event time 和 late elements 有更详细的探讨。

默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。 但是 Flink 允许指定窗口算子最大的 allowed lateness。 Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。 在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素, 依旧会被加入窗口。取决于窗口的 trigger,一个迟到但没有被丢弃的元素可能会再次触发窗口,比如 EventTimeTrigger。

为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness 超时才会将窗口及其状态删除 (如 Window Lifecycle 所述)。

默认情况下,allowed lateness 被设为 0。即 watermark 之后到达的元素会被丢弃。

你可以像下面这样指定 allowed lateness:

DataStream input = ...; input .keyBy() .window() .allowedLateness() .(); val input: DataStream[T] = ... input .keyBy() .window() .allowedLateness() .() input = ... # type: DataStream input \ .key_by() \ .window() \ .allowed_lateness() \ .() 使用 GlobalWindows 时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是 Long.MAX_VALUE。 从旁路输出(side output)获取迟到数据 #

通过 Flink 的 旁路输出 功能,你可以获得迟到数据的数据流。

首先,你需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明你需要获取迟到数据。 然后,你就可以从窗口操作的结果中获取旁路输出流了。

final OutputTag lateOutputTag = new OutputTag("late-data"){}; DataStream input = ...; SingleOutputStreamOperator result = input .keyBy() .window() .allowedLateness() .sideOutputLateData(lateOutputTag) .(); DataStream lateStream = result.getSideOutput(lateOutputTag); val lateOutputTag = OutputTag[T]("late-data") val input: DataStream[T] = ... val result = input .keyBy() .window() .allowedLateness() .sideOutputLateData(lateOutputTag) .() val lateStream = result.getSideOutput(lateOutputTag) late_output_tag = OutputTag("late-data", type_info) input = ... # type: DataStream result = input \ .key_by() \ .window() \ .allowed_lateness() \ .side_output_late_data(late_output_tag) \ .() late_stream = result.get_side_output(late_output_tag) 迟到数据的一些考虑 #

当指定了大于 0 的 allowed lateness 时,窗口本身以及其中的内容仍会在 watermark 越过窗口末端后保留。 这时,如果一个迟到但未被丢弃的数据到达,它可能会再次触发这个窗口。 这种触发被称作 late firing,与表示第一次触发窗口的 main firing 相区别。 如果是使用会话窗口的情况,late firing 可能会进一步合并已有的窗口,因为他们可能会连接现有的、未被合并的窗口。

你应该注意:late firing 发出的元素应该被视作对之前计算结果的更新,即你的数据流中会包含一个相同计算任务的多个结果。你的应用需要考虑到这些重复的结果,或去除重复的部分。 Working with window results #

窗口操作的结果会变回 DataStream,并且窗口操作的信息不会保存在输出的元素中。 所以如果你想要保留窗口的 meta-information,你需要在 ProcessWindowFunction 里手动将他们放入输出的元素中。 输出元素中保留的唯一相关的信息是元素的 timestamp。 它被设置为窗口能允许的最大 timestamp,也就是 end timestamp - 1,因为窗口末端的 timestamp 是排他的。 这个情况同时适用于 event-time 窗口和 processing-time 窗口。 也就是说,在窗口操作之后,元素总是会携带一个 event-time 或 processing-time timestamp。 对 Processing-time 窗口来说,这并不意味着什么。 而对于 event-time 窗口来说,“输出携带 timestamp” 以及 “watermark 与窗口的相互作用” 这两者使建立窗口大小相同的连续窗口操作(consecutive windowed operations) 变为可能。我们先看看 watermark 与窗口的相互作用,然后再来讨论它。

Interaction of watermarks and windows #

继续阅读这个章节之前,你可能想要先了解一下我们介绍 event time 和 watermarks 的内容。

当 watermark 到达窗口算子时,它触发了两件事:

这个 watermark 触发了所有最大 timestamp(即 end-timestamp - 1)小于它的窗口 这个 watermark 被原封不动地转发给下游的任务。

通俗来讲,watermark 将当前算子中那些“一旦这个 watermark 被下游任务接收就肯定会就超时”的窗口全部冲走。

Consecutive windowed operations #

如之前提到的,窗口结果的 timestamp 如何计算以及 watermark 如何与窗口相互作用使串联多个窗口操作成为可能。 这提供了一种便利的方法,让你能够有两个连续的窗口,他们即能使用不同的 key, 又能让上游操作中某个窗口的数据出现在下游操作的相同窗口。参考下例:

DataStream input = ...; DataStream resultsPerKey = input .keyBy() .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer()); DataStream globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction()); val input: DataStream[Int] = ... val resultsPerKey = input .keyBy() .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer()) val globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction()) input = ... # type: DataStream results_per_key = input \ .key_by() \ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \ .reduce(Summer()) global_results = results_per_key \ .window_all(TumblingProcessingTimeWindows.of(Time.seconds(5))) \ .process(TopKWindowFunction())

这个例子中,第一个操作中时间窗口[0, 5) 的结果会出现在下一个窗口操作的 [0, 5) 窗口中。 这就可以让我们先在一个窗口内按 key 求和,再在下一个操作中找出这个窗口中 top-k 的元素。

关于状态大小的考量 #

窗口可以被定义在很长的时间段上(比如几天、几周或几个月)并且积累下很大的状态。 当你估算窗口计算的储存需求时,可以铭记几条规则:

Flink 会为一个元素在它所属的每一个窗口中都创建一个副本。 因此,一个元素在滚动窗口的设置中只会存在一个副本(一个元素仅属于一个窗口,除非它迟到了)。 与之相反,一个元素可能会被拷贝到多个滑动窗口中,就如我们在 Window Assigners 中描述的那样。 因此,设置一个大小为一天、滑动距离为一秒的滑动窗口可能不是个好想法。

ReduceFunction 和 AggregateFunction 可以极大地减少储存需求,因为他们会就地聚合到达的元素, 且每个窗口仅储存一个值。而使用 ProcessWindowFunction 需要累积窗口中所有的元素。

使用 Evictor 可以避免预聚合, 因为窗口中的所有数据必须先经过 evictor 才能进行计算(详见 Evictors)。

Back to top



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3