Flink学习(一):Flink基本使用

您所在的位置:网站首页 flink框架结构 Flink学习(一):Flink基本使用

Flink学习(一):Flink基本使用

2023-08-10 02:02| 来源: 网络整理| 查看: 265

目录 1. 初识Flink1. Flink概述2. Flink Layered API 2. 快速上手开发第一个Flink应用程序1. 开发环境准备2. 使用Flink开发一个批处理应用程序3. 使用Flink开发一个实时处理应用程序 3. Flink编程模型及核心概念1. DataSet & DataStream2. Flink编程模型3. 系统架构 4. DataSet 基本API使用1. DataSet API开发概述2. DataSource3. Transformation4. Sink 5. DataStream 基本API使用1. Data Source2. Transformation3. Sink 6. Flink Table API & SQL编程1. 什么是Flink关系型API2. Table API & SQL开发概述3. Table API & SQL编程 最后

1. 初识Flink 1. Flink概述

Flink是什么

Apache Flink - Stateful Computations over Data StreamsApache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Unbounded vs Bounded data

简单来说,Flink 是一个分布式的流处理框架,它能够对有界和无界的数据流进行高效的处理。Flink 的核心是流处理,当然它也能支持批处理,Flink 将批处理看成是流处理的一种特殊情况,即数据流是有明确界限的。这和 Spark Streaming 的思想是完全相反的,Spark Streaming 的核心是批处理,它将流处理看成是批处理的一种特殊情况, 即把数据流进行极小粒度的拆分,拆分为多个微批处理。

都可以使用Flink来处理,对应的就是流处理和批处理。

2. Flink Layered API Flink 采用分层的架构设计,从而保证各层在功能和职责上的清晰。

SQL & Table API SQL & Table API 同时适用于批处理和流处理,这意味着你可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外, 它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。 DataStream & DataSet API DataStream & DataSet API 是 Flink 数据处理的核心 API,支持使用 Java 语言或 Scala 语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。 Stateful Stream Processing Stateful Stream Processing 是最低级别的抽象,它通过 Process Function 函数内嵌到 DataStream API 中。 Process Function 是 Flink 提供的最底层 API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。 2. 快速上手开发第一个Flink应用程序 1. 开发环境准备 JDKMaven 2. 使用Flink开发一个批处理应用程序

需求描述

词频统计(word count) 一个文件,统计文件中每个单词出现的次数分隔符是\t统计结果我们直接打印在控制台(生产上肯定是Sink到目的地)

Flink + Java

前置条件:The only requirements are working Maven 3.0.4 (or higher) and Java 8.x installations.创建方式 $ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.8.0 开发流程/开发八股文编程 set up the batch execution environmentreadtransform operations 开发核心所在:开发业务逻辑execute program 功能拆解 读取数据每一行的数据按照指定的分隔符拆分为每一个单词赋上次数为1合并操作 /** * 使用Java API来开发Flink的批处理应用程序 * Created by thpffcj on 2019-06-28. */ public class BatchWCJavaApp { public static void main(String[] args) throws Exception { String input = "file:///Users/thpffcj/Public/data/hello.txt"; // step1:获取运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // step2:read data DataSource text = env.readTextFile(input); // step3:transform text.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector collector) throws Exception { String[] tokens = value.toLowerCase().split("\t"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2(token, 1)); } } } }).groupBy(0).sum(1).print(); // step4: } } 3. 使用Flink开发一个实时处理应用程序

Flink + Scala

/** * 使用Scala开发Flink的实时处理应用程序 * Created by thpffcj on 2019-06-29. */ object StreamingWCScalaApp { def main(args: Array[String]): Unit = { // step1:获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // step2:读取数据 val text = env.socketTextStream("localhost", 9999) import org.apache.flink.api.scala._ // step3:transform text.flatMap(_.split(",")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).print() env.execute("StreamingWCScalaApp") } } 3. Flink编程模型及核心概念 1. DataSet & DataStream DataSet:批式处理,其接口封装类似于Spark的Dataset,支持丰富的函数操作,比如map/fliter/join/cogroup等。DataStram:流式处理,其结构封装实现输入流的处理,其也实现了丰富的函数支持。DataSet同DataStream从其接口封装、真实计算Operator有很大的差别,Dataset的实现在flink-javamodule中,而DataStream的实现在flink-streaming-java中。 2. Flink编程模型 Anatomy of a Flink Program 3. 系统架构

Flink架构

Flink的搭建需要四个不同的组件:JobManager,ResourceManager,TaskManager和Dispatcher

JobManagers (也称为 masters) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 leader,其余的则处于 standby 状态。ResourceManager :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。TaskManagers (也称为 workers) : TaskManagers 负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。Dispatcher:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。

任务执行

一个 TaskManager 允许同时执行多个任务。这些任务可以属于用一个算子(数据并行),也可以是不同算子(任务并行),甚至还可以来自不同的应用(作业并行)。

算子任务以及处理槽

上图由于算子最大并行度是4,因此应用若要执行则至少需要4个处理槽。将任务以切片的形式调度至处理槽有一个好处:TaskManager中的多个任务可以在同一进程内高效地执行数据交换而无需访问网络。

高可用性设置

为了从故障中恢复,系统首先要重启故障进程,随后需要重启应用并恢复其状态。 4. DataSet 基本API使用 1. DataSet API开发概述

最简单的通用的数据处理流程应该很容易想到,就是有一个数据输入,Source;一个数据处理,Transform;还有一个是数据输出,也就是Sink。

2. DataSource 基于文件基于集合

从集合创建dataset

Scala实现 object DataSetDataSourceApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment fromCollection(env) } def fromCollection(env: ExecutionEnvironment): Unit = { import org.apache.flink.api.scala._ val data = 1 to 10 env.fromCollection(data).print() } } Java实现 public class JavaDataSetDataSourceApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); fromCollection(env); } public static void fromCollection(ExecutionEnvironment env) throws Exception { List list = new ArrayList(); for (int i = 1; i { (first._1, first._2, second._2) }).print() } 4. Sink

数据接收器消费数据集,数据接收器操作使用 OutputFormat 描述。

object DataSetSinkApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val data = 1.to(10) val text = env.fromCollection(data) val filePath = "file:///Users/thpffcj/Public/data/sink-out" text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2) env.execute("DataSetSinkApp") } } 5. DataStream 基本API使用 1. Data Source

Socket-based

object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment socketFunction(env) env.execute("DataStreamSourceApp") } def socketFunction(env: StreamExecutionEnvironment): Unit = { val data = env.socketTextStream("localhost", 9999) data.print() } } 我们使用netcat发送数据 nc -lk 9999

Custom:实现自定义数据源

方式一:implementing the SourceFunction for non-parallel sources class CustomNonParallelSourceFunction extends SourceFunction[Long]{ var count = 1L var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } 接着我们需要将数据源添加到环境中 object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment nonParallelSourceFunction(env) env.execute("DataStreamSourceApp") } def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.print() } } 方式二:implementing the ParallelSourceFunction interface class CustomParallelSourceFunction extends ParallelSourceFunction[Long] 这种方式我们可以设置并行度了 def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomParallelSourceFunction).setParallelism(2) data.print() } 方式三:extending the RichParallelSourceFunction for parallel sources class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] 2. Transformation

可以将一个或多个 DataStream 转换为新的 DataStream

Map和Filter

我们直接使用上面的自定义数据源产生数据 def filterFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.map(x =>{ println("received: " + x) x }).filter(_%2 == 0).print().setParallelism(1) }

Union

Union 可以将多个流合并到一个流中,以便对合并的流进行统一处理。

def unionFunction(env: StreamExecutionEnvironment): Unit = { val data1 = env.addSource(new CustomNonParallelSourceFunction) val data2 = env.addSource(new CustomNonParallelSourceFunction) data1.union(data2).print().setParallelism(1) }

Split和Select

Split 将一个流拆分为多个流。

Select 从拆分流中选择一个或多个流。

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val splits = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) splits.select("even").print().setParallelism(1) } 3. Sink

自定义Sink

需求:socket发送数据过来,把String类型转换成对象,然后把Java对象保存到MySQL数据库中数据库建表 create table student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(id) ); 继承RichSinkFunction,T是你想要写入对象的类型重写open/close:生命周期方法重写invoke:每条记录执行一次 public class SinkToMySQL extends RichSinkFunction { Connection connection; PreparedStatement preparedStatement; private Connection getConnection() { Connection conn = null; try { String url = "jdbc:mysql://localhost:3306/test"; conn = DriverManager.getConnection(url, "root", "00000000"); } catch (Exception e) { e.printStackTrace(); } return conn; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into Student(id, name, age) values (?, ?, ?)"; preparedStatement = connection.prepareStatement(sql); } // 每条记录插入时调用一次 public void invoke(Student value, Context context) throws Exception { // 为前面的占位符赋值 preparedStatement.setInt(1, value.getId()); preparedStatement.setString(2, value.getName()); preparedStatement.setInt(3, value.getAge()); preparedStatement.executeUpdate(); } @Override public void close() throws Exception { if(connection != null) { try { connection.close(); } catch(Exception e) { e.printStackTrace(); } connection = null; } } } 开发测试方法 public class JavaCustomSinkToMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.socketTextStream("localhost", 7777); SingleOutputStreamOperator studentStream = source.map(new MapFunction() { @Override public Student map(String value) throws Exception { System.out.println(value); String[] splits = value.split(","); Student stu = new Student(); stu.setId(Integer.parseInt(splits[0])); stu.setName(splits[1]); stu.setAge(Integer.parseInt(splits[2])); return stu; } }); studentStream.addSink(new SinkToMySQL()); env.execute("JavaCustomSinkToMySQL"); } } 6. Flink Table API & SQL编程

你不能要求业务使用人员都会编程,因此一个好的框架都会提供Table API方便人们的使用,人们可以通过SQL来完成自己的任务。

1. 什么是Flink关系型API

Layered-APIs

DataSet & DataStream API 熟悉两套API:DataSet/DataStream MapReduce => Hive SQLSpark => Spark SQLFlink => SQL Flink是支持批处理/流处理,如何做到API层面的统一 Table & SQL API:关系型API 2. Table API & SQL开发概述 Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing.The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. 3. Table API & SQL编程 准备一份数据 Thpffcj:data thpffcj$ cat sales.csv transactionId,customerId,itemId,amountPaid 111,1,1,100.0 112,2,2,505.0 113,3,3,510.0 114,4,4,600.0 115,1,2,500.0 public class JavaTableSQLAPI { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); String filePath = "file:///Users/thpffcj/Public/data/sales.csv"; // 已经拿到DataSet DataSet csv = env.readCsvFile(filePath) .ignoreFirstLine() .pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid"); // DataSet => Table Table sales = tableEnv.fromDataSet(csv); // Table => table tableEnv.registerTable("sales", sales); // sql Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId"); DataSet result = tableEnv.toDataSet(resultTable, Row.class); result.print(); } public static class Sales { public String transactionId; public String customerId; public String itemId; public Double amountPaid; } } 最后

大家可以关注我的微信公众号一起学习进步。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3