细说FlinkSQL&TableAPI内置函数和用户自定义函数

您所在的位置:网站首页 细说javascript 细说FlinkSQL&TableAPI内置函数和用户自定义函数

细说FlinkSQL&TableAPI内置函数和用户自定义函数

2023-06-01 05:37| 来源: 网络整理| 查看: 265

Flink Table 和 SQL内置了很多SQL中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。

一、系统内置函数

Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。SQL中支持的很多函数,Table API和SQL都已经做了实现,其它还在快速开发扩展中。

以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/functions/systemFunctions.html

1. 比较函数

SQL:

value1 = value2

value1 > value2

Table API:

ANY1 === ANY2

ANY1 > ANY2

2. 逻辑函数

SQL:

boolean1 OR boolean2

boolean IS FALSE

NOT boolean

Table API:

BOOLEAN1 || BOOLEAN2

BOOLEAN.isFalse

!BOOLEAN

3. 算术函数

SQL:

numeric1 + numeric2

POWER(numeric1, numeric2)

Table API:

NUMERIC1 + NUMERIC2

NUMERIC1.power(NUMERIC2)

4. 字符串函数

SQL:

string1 || string2

UPPER(string)

CHAR_LENGTH(string)

Table API:

STRING1 + STRING2

STRING.upperCase()

STRING.charLength()

5. 时间函数

SQL:

DATE string

TIMESTAMP string

CURRENT_TIME

INTERVAL string range

Table API:

STRING.toDate

STRING.toTimestamp

currentTime()

NUMERIC.days

NUMERIC.minutes

6. 聚合函数

SQL:

COUNT(*)

SUM([ ALL | DISTINCT ] expression)

RANK()

ROW_NUMBER()

Table API:

FIELD.count

FIELD.sum0

二、UDF

用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现。

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#table-functions

1. 注册用户自定义函数UDF

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的Table API注册函数。

函数通过调用registerFunction()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它。

2. 标量函数(Scalar Functions)

用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。

为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接public声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。

public static void main(String[] args) throws Exception {

//1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//2.读取元素得到DataStream

DataStreamSource waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60));

//3.将流转换为动态表

Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);

//4.不注册函数直接使用

// table.select(call(Mylenth.class,$("id"))).execute().print();

//4.1先注册再使用

tableEnv.createTemporarySystemFunction("MyLenth", Mylenth.class);

//TableAPI

// table.select(call("MyLenth", $("id"))).execute().print();

//SQL

tableEnv.executeSql("select MyLenth(id) from "+table).print();

}

//自定义UDF函数,求数据的长度

public static class Mylenth extends ScalarFunction{

public int eval(String value) {

return value.length();

}

}

3. 表函数(Table Functions)

与用户定义的标量函数类似,用户定义的表函数,可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。

为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval。求值方法的参数类型,决定表函数的所有有效参数。

返回表的类型由TableFunction的泛型类型确定。求值方法使用protected collect(T)方法发出输出行。

在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。

joinLateral算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。

而leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。

在SQL中,则需要使用Lateral Table(),或者带有ON TRUE条件的左连接。

下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。

public static void main(String[] args) throws Exception {

//1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//2.读取文件得到DataStream

DataStreamSource waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60));

//3.将流转换为动态表

Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);

//4先注册再使用

tableEnv.createTemporarySystemFunction("split", SplitFunction.class);

//TableAPI

/* table

.joinLateral(call("split", $("id")))

.select($("id"),$("word"))

.execute()

.print();

*/

//SQL

tableEnv.executeSql("select id, word from "+table +", lateral table(split(id))").print();

}

//自定义UDTF函数将传入的id按照下划线炸裂成两条数据

//hint暗示,主要作用为类型推断时使用

@FunctionHint(output = @DataTypeHint("ROW"))

public static class SplitFunction extends TableFunction {

public void eval(String str) {

for (String s : str.split("_")) {

collect(Row.of(s));

}

}

}

4. 聚合函数(Aggregate Functions)

用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。

 

上图中显示了一个聚合的例子。

假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。

AggregateFunction的工作原理如下。

首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。

随后,对每个输入行调用函数的accumulate()方法来更新累加器。

处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。

AggregationFunction要求必须实现的方法:

createAccumulator()accumulate()getValue()

除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。

retract()merge()resetAccumulator()

接下来我们写一个自定义AggregateFunction,计算一下每个WaterSensor中VC的平均值。

public static void main(String[] args) throws Exception {

//1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//2.读取文件得到DataStream

DataStreamSource waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60));

//3.将流转换为动态表

Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);

//4先注册再使用

tableEnv.createTemporarySystemFunction("myavg", MyAvg.class);

//TableAPI

table.groupBy($("id"))

.select($("id"),call("myavg",$("vc")))

.execute()

.print();

//SQL

tableEnv.executeSql("select id, myavg(vc) from "+table +" group by id").print();

}

//定义一个类当做累加器,并声明总数和总个数这两个值

public static class MyAvgAccumulator{

public long sum = 0;

public int count = 0;

}

//自定义UDAF函数,求每个WaterSensor中VC的平均值

public static class MyAvg extends AggregateFunction {

//创建一个累加器

@Override

public MyAvgAccumulator createAccumulator() {

return new MyAvgAccumulator();

}

//做累加操作

public void accumulate(MyAvgAccumulator acc, Integer vc) {

acc.sum += vc;

acc.count += 1;

}

//将计算结果值返回

@Override

public Double getValue(MyAvgAccumulator accumulator) {

return accumulator.sum*1D/accumulator.count;

}

}

5. 表聚合函数(Table Aggregate Functions)

用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。

 

比如现在我们需要找到表中所有WaterSensor的前2个最高水位线,即执行top2()表聚合。

用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。

TableAggregateFunction的工作原理如下。

首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。

随后,对每个输入行调用函数的accumulate()方法来更新累加器。

处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。

AggregationFunction要求必须实现的方法:

createAccumulator()getValue()

除了上述方法之外,还有一些可选择实现的方法。

retract()merge()resetAccumulator()emitValue()emitUpdateWithRetract()

接下来我们写一个自定义TableAggregateFunction,用来提取每个WaterSensor最高的两个水位值。

public static void main(String[] args) throws Exception {

//1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//2.读取文件得到DataStream

DataStreamSource waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60));

//3.将流转换为动态表

Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);

//4先注册再使用

tableEnv.createTemporarySystemFunction("Top2", Top2.class);

//TableAPI

table.groupBy($("id"))

.flatAggregate(call("Top2", $("vc")).as("top", "rank"))

.select($("id"), $("top"), $("rank"))

.execute()

.print();

}

//定义一个类当做累加器,并声明第一和第二这两个值

public static class vCTop2 {

public Integer first = Integer.MIN_VALUE;

public Integer second = Integer.MIN_VALUE;

}

//自定义UDATF函数(多进多出),求每个WaterSensor中最高的两个水位值

public static class Top2 extends TableAggregateFunction {

//创建累加器

@Override

public vCTop2 createAccumulator() {

return new vCTop2();

}

//比较数据,如果当前数据大于累加器中存的数据则替换,并将原累加器中的数据往下(第二)赋值

public void accumulate(vCTop2 acc, Integer value) {

if (value > acc.first) {

acc.second = acc.first;

acc.first = value;

} else if (value > acc.second) {

acc.second = value;

}

}

//计算(排名)

public void emitValue(vCTop2 acc, Collector out) {

// emit the value and rank

if (acc.first != Integer.MIN_VALUE) {

out.collect(Tuple2.of(acc.first, 1));

}

if (acc.second != Integer.MIN_VALUE) {

out.collect(Tuple2.of(acc.second, 2));

}

}

}

总结

Flink提供众多系统内置函数:比较函数、逻辑函数、算术函数、字符串函数、时间函数、聚合函数。对于无法满足的需求,我们可以用UDF自定义函数解决。用户自定义聚合函数(UDA)可以把一个表中的数据,聚合成一个标量值。用户定义的表聚合函数(UDTA),可以把一个表中数据,聚合为具有多行和多列的结果表。跟聚合函数AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3