Flink基础学习(Scala):数据转换算子Transform

您所在的位置:网站首页 表白1分钟 Flink基础学习(Scala):数据转换算子Transform

Flink基础学习(Scala):数据转换算子Transform

#Flink基础学习(Scala):数据转换算子Transform| 来源: 网络整理| 查看: 265

文章目录 一、前言二、算子2.1 简单算子2.2 聚合算子2.3 多流转换算子2.3.1 分流2.3.2 合流 三、总结

一、前言

前言

二、算子 2.1 简单算子

常见的简单算子有map、flatmap、filter等等,下面通过代码来实现一下

import org.apache.flink.streaming.api.scala._ object StreamTransform { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并行度为1 env.setParallelism(1) // 从集合中获取不同数据类型数据 val dataStream1 = env.fromCollection(List(1, 2, 3)) // 对每个数都乘以2 val resultStream1 = dataStream1.map(data => data * 2) resultStream1.print("resultStream1") val dataStream2 = env.fromCollection(List("hello world", "hello flink", "hello spark")) val resultStream2 = dataStream2.flatMap(_.split(" ")) resultStream2.print("resultStream2") val resultStream3 = dataStream1.filter(_%2==0) resultStream3.print("resultStream3") env.execute("Stream Transform") } } 2.2 聚合算子

常见的聚合算子有keyBy、min、max、sum、minBy、maxBy、reduce等

import org.apache.flink.streaming.api.scala._ case class Sensor(id:String, temp:Double, time:Timestamp) object StreamTransform { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val dataStream = env.fromCollection(List( Sensor("sensor_1", 37.8, new Timestamp(1584864313644L)), Sensor("sensor_2", 32.9, new Timestamp(1584864313646L)), Sensor("sensor_3", 33.4, new Timestamp(1584864313648L)), Sensor("sensor_1", 29.5, new Timestamp(1584864313645L)), Sensor("sensor_1", 32.1, new Timestamp(1584864313649L)) )) val resultStream = dataStream.keyBy(data => data.id) // .min("temp") // .minBy("temp") .reduce((currdata, newdata) => Sensor(currdata.id, currdata.temp.min(newdata.temp), newdata.time)) resultStream.print("resultStream") env.execute("Stream Transform")

说一下上面的算子,keyBy是根据key分组,min、max、minBy、maxBy、sum这些算子是求最小值、最大值、求和,而reduce是通用的聚合算子,他可以实现这五个算子的功能,这里重要说一下min和minBy的区别,这两者其他功能都是一样的,唯一的一点区别minBy会更新符合条件的整体数据,而min只会更新符合条件的那个字段

2.3 多流转换算子 2.3.1 分流

说到分流,在Flink 1.12.0之前可以通过split和select来分流,但在Flink1.12.0之后这两个算子已经弃用,在介绍正式的分流算子前先说一下其他我们之前也接触过分流算子filter,不过这种效率比较低,分成多个流需要重复执行多次,好了接下来说说分流使用的是process算子

import java.sql.Timestamp import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector case class Sensor(id: String, temp: Double, time: Timestamp) object StreamTransform { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val dataStream = env.fromCollection(List( Sensor("sensor_1", 37.8, new Timestamp(1584864313644L)), Sensor("sensor_2", 32.9, new Timestamp(1584864313646L)), Sensor("sensor_3", 33.4, new Timestamp(1584864313648L)), Sensor("sensor_1", 29.5, new Timestamp(1584864313645L)), Sensor("sensor_1", 32.1, new Timestamp(1584864313649L)) )) val splitStream = dataStream.process(new ProcessFunction[Sensor, Sensor] { override def processElement(i: Sensor, context: ProcessFunction[Sensor, Sensor]#Context, collector: Collector[Sensor]): Unit = { // 放到了侧输出流,需要使用getSideOutput获取 if (i.temp > 30.0) context.output(new OutputTag[Sensor]("high"), i) else context.output(new OutputTag[Sensor]("low"), i) // 放到了主输出流,可以直接print collector.collect(i) } }) splitStream.print("all") splitStream.getSideOutput(new OutputTag[Sensor]("high")).print("high") splitStream.getSideOutput(new OutputTag[Sensor]("low")).print("low") env.execute("Stream Transform") } } 2.3.2 合流

合流有两种方式,一种是使用connect,另外一种是union,两个的区别是union的数据类型必须一样,而connect数据类型可以不同,这里根据分流的例子来说明,核心代码如下

// 分流后的高温流 val highStream = splitStream.getSideOutput(new OutputTag[Sensor]("high")) // 分流后的低温流 val lowStream = splitStream.getSideOutput(new OutputTag[Sensor]("low")) // 高温流转换成警告流 val warnStream = highStream.map(data => (data.id, data.temp)) // 通过connect将高温流和警告流合并起来,类型可以不同,在使用comap转换成DataStream类型流 val connectStream = highStream.connect(warnStream) val coMapStream = connectStream.map( highdata => (highdata.id, highdata.temp), warndata => (warndata._1, warndata._2, "warnning") ) coMapStream.print("connect") // 通过union将两个类型相同的流合并起来 val unionStream = highStream.union(lowStream) unionStream.print("union") 三、总结

以上就是今天要讲的内容,本文的重点就是分流和合流,要重点掌握



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3