Flink基于 DataStream API 实现欺诈检测示例解析

您所在的位置:网站首页 flink代码示例 Flink基于 DataStream API 实现欺诈检测示例解析

Flink基于 DataStream API 实现欺诈检测示例解析

2023-07-16 08:06| 来源: 网络整理| 查看: 265

欢迎关注今日头条号、微信公众号、知乎号:仰望夜空一万次

随意聊聊并记录从小城市到上海工作生活的所思所想。

不去记录,有些事情都好像没有发生过。

阅读概念和原理后,再次阅读这个示例,感觉清澈如同秋日的天空。

原始的文章为:欺诈检测

最终完整的代码示例如下:

package spendreport; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; /**此类继承KeyedProcessFunction抽象类, KeyedProcessFunction抽象类实现了RichFunction接口的open方法, 此类对open方法进行覆盖实现,用于注册状态。 */ public class FraudDetector extends KeyedProcessFunction { private static final long serialVersionUID = 1L; private static final double SMALL_AMOUNT = 1.00; private static final double LARGE_AMOUNT = 500.00; private static final long ONE_MINUTE = 60 * 1000; private transient ValueState flagState; private transient ValueState timerState; /** 状态需要使用 open() 函数来注册状态 */ @Override public void open(Configuration parameters) { ValueStateDescriptor flagDescriptor = new ValueStateDescriptor( "flag", Types.BOOLEAN); flagState = getRuntimeContext().getState(flagDescriptor); ValueStateDescriptor timerDescriptor = new ValueStateDescriptor( "timer-state", Types.LONG); timerState = getRuntimeContext().getState(timerDescriptor); } /** 真正处理流数据中的每条记录 */ @Override public void processElement( Transaction transaction, Context context, Collector collector) throws Exception { // Get the current state for the current key Boolean lastTransactionWasSmall = flagState.value(); // Check if the flag is set if (lastTransactionWasSmall != null) { if (transaction.getAmount() > LARGE_AMOUNT) { //Output an alert downstream Alert alert = new Alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } // Clean up our state cleanUp(context); } if (transaction.getAmount() < SMALL_AMOUNT) { // set the flag to true flagState.update(true); long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; /**timerService定时器服务可以用于查询当前时间、注册定时器和删除定时器。*/ context.timerService().registerProcessingTimeTimer(timer); timerState.update(timer); } } /** 当定时器触发时,将会调用onTimer方法。 通过重写这个方法来实现一个你自己的重置状态的回调逻辑。 */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) { // remove flag after 1 minute timerState.clear(); flagState.clear(); } private void cleanUp(Context ctx) throws Exception { // delete timer Long timer = timerState.value(); ctx.timerService().deleteProcessingTimeTimer(timer); // clean up all state timerState.clear(); flagState.

Flink 中最基础的状态类型是 ValueState,ValueState 是一种 keyed state 。

ValueState的实例允许空值的 ,对于一个boolean类型来说,有null,true,false三种类型。

ValueState接口的继承关系如下所示:

 

KeyedProcessFunction抽象类的的继承关系为:

 

KeyedProcessFunction类如下所示,K表示groupby所使用的key,I表示输入的类型,O表示输出的类型

public abstract class KeyedProcessFunction extends AbstractRichFunction { .... }

KeyedProcessFunction抽象类内部结构如下图所示,有onTimer定时器方法和对每个元素处理的 processElement方法。

根据具体业务,实现这两个方法和open方法初始化状态来满足业务开发。

 

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/try-flink/datastream_api.html



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3