Flink学习(一):Flink基本使用 |
您所在的位置:网站首页 › flink框架结构 › Flink学习(一):Flink基本使用 |
目录
1. 初识Flink1. Flink概述2. Flink Layered API
2. 快速上手开发第一个Flink应用程序1. 开发环境准备2. 使用Flink开发一个批处理应用程序3. 使用Flink开发一个实时处理应用程序
3. Flink编程模型及核心概念1. DataSet & DataStream2. Flink编程模型3. 系统架构
4. DataSet 基本API使用1. DataSet API开发概述2. DataSource3. Transformation4. Sink
5. DataStream 基本API使用1. Data Source2. Transformation3. Sink
6. Flink Table API & SQL编程1. 什么是Flink关系型API2. Table API & SQL开发概述3. Table API & SQL编程
最后
1. 初识Flink
1. Flink概述
Flink是什么 Apache Flink - Stateful Computations over Data StreamsApache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.Unbounded vs Bounded data 简单来说,Flink 是一个分布式的流处理框架,它能够对有界和无界的数据流进行高效的处理。Flink 的核心是流处理,当然它也能支持批处理,Flink 将批处理看成是流处理的一种特殊情况,即数据流是有明确界限的。这和 Spark Streaming 的思想是完全相反的,Spark Streaming 的核心是批处理,它将流处理看成是批处理的一种特殊情况, 即把数据流进行极小粒度的拆分,拆分为多个微批处理。需求描述 词频统计(word count) 一个文件,统计文件中每个单词出现的次数分隔符是\t统计结果我们直接打印在控制台(生产上肯定是Sink到目的地)Flink + Java 前置条件:The only requirements are working Maven 3.0.4 (or higher) and Java 8.x installations.创建方式 $ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.8.0 开发流程/开发八股文编程 set up the batch execution environmentreadtransform operations 开发核心所在:开发业务逻辑execute program 功能拆解 读取数据每一行的数据按照指定的分隔符拆分为每一个单词赋上次数为1合并操作 /** * 使用Java API来开发Flink的批处理应用程序 * Created by thpffcj on 2019-06-28. */ public class BatchWCJavaApp { public static void main(String[] args) throws Exception { String input = "file:///Users/thpffcj/Public/data/hello.txt"; // step1:获取运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // step2:read data DataSource text = env.readTextFile(input); // step3:transform text.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector collector) throws Exception { String[] tokens = value.toLowerCase().split("\t"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2(token, 1)); } } } }).groupBy(0).sum(1).print(); // step4: } } 3. 使用Flink开发一个实时处理应用程序Flink + Scala /** * 使用Scala开发Flink的实时处理应用程序 * Created by thpffcj on 2019-06-29. */ object StreamingWCScalaApp { def main(args: Array[String]): Unit = { // step1:获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // step2:读取数据 val text = env.socketTextStream("localhost", 9999) import org.apache.flink.api.scala._ // step3:transform text.flatMap(_.split(",")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).print() env.execute("StreamingWCScalaApp") } } 3. Flink编程模型及核心概念 1. DataSet & DataStream DataSet:批式处理,其接口封装类似于Spark的Dataset,支持丰富的函数操作,比如map/fliter/join/cogroup等。DataStram:流式处理,其结构封装实现输入流的处理,其也实现了丰富的函数支持。DataSet同DataStream从其接口封装、真实计算Operator有很大的差别,Dataset的实现在flink-javamodule中,而DataStream的实现在flink-streaming-java中。 2. Flink编程模型 Anatomy of a Flink Program 3. 系统架构Flink的搭建需要四个不同的组件:JobManager,ResourceManager,TaskManager和Dispatcher JobManagers (也称为 masters) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 leader,其余的则处于 standby 状态。ResourceManager :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。TaskManagers (也称为 workers) : TaskManagers 负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。Dispatcher:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。任务执行 一个 TaskManager 允许同时执行多个任务。这些任务可以属于用一个算子(数据并行),也可以是不同算子(任务并行),甚至还可以来自不同的应用(作业并行)。 高可用性设置 为了从故障中恢复,系统首先要重启故障进程,随后需要重启应用并恢复其状态。 4. DataSet 基本API使用 1. DataSet API开发概述最简单的通用的数据处理流程应该很容易想到,就是有一个数据输入,Source;一个数据处理,Transform;还有一个是数据输出,也就是Sink。 2. DataSource 基于文件基于集合从集合创建dataset Scala实现 object DataSetDataSourceApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment fromCollection(env) } def fromCollection(env: ExecutionEnvironment): Unit = { import org.apache.flink.api.scala._ val data = 1 to 10 env.fromCollection(data).print() } } Java实现 public class JavaDataSetDataSourceApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); fromCollection(env); } public static void fromCollection(ExecutionEnvironment env) throws Exception { List list = new ArrayList(); for (int i = 1; i { (first._1, first._2, second._2) }).print() } 4. Sink数据接收器消费数据集,数据接收器操作使用 OutputFormat 描述。 object DataSetSinkApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val data = 1.to(10) val text = env.fromCollection(data) val filePath = "file:///Users/thpffcj/Public/data/sink-out" text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2) env.execute("DataSetSinkApp") } } 5. DataStream 基本API使用 1. Data SourceSocket-based object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment socketFunction(env) env.execute("DataStreamSourceApp") } def socketFunction(env: StreamExecutionEnvironment): Unit = { val data = env.socketTextStream("localhost", 9999) data.print() } } 我们使用netcat发送数据 nc -lk 9999Custom:实现自定义数据源 方式一:implementing the SourceFunction for non-parallel sources class CustomNonParallelSourceFunction extends SourceFunction[Long]{ var count = 1L var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } 接着我们需要将数据源添加到环境中 object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment nonParallelSourceFunction(env) env.execute("DataStreamSourceApp") } def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.print() } } 方式二:implementing the ParallelSourceFunction interface class CustomParallelSourceFunction extends ParallelSourceFunction[Long] 这种方式我们可以设置并行度了 def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomParallelSourceFunction).setParallelism(2) data.print() } 方式三:extending the RichParallelSourceFunction for parallel sources class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] 2. Transformation可以将一个或多个 DataStream 转换为新的 DataStream Map和Filter 我们直接使用上面的自定义数据源产生数据 def filterFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.map(x =>{ println("received: " + x) x }).filter(_%2 == 0).print().setParallelism(1) }Union Union 可以将多个流合并到一个流中,以便对合并的流进行统一处理。 def unionFunction(env: StreamExecutionEnvironment): Unit = { val data1 = env.addSource(new CustomNonParallelSourceFunction) val data2 = env.addSource(new CustomNonParallelSourceFunction) data1.union(data2).print().setParallelism(1) }Split和Select Split 将一个流拆分为多个流。 Select 从拆分流中选择一个或多个流。 def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val splits = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) splits.select("even").print().setParallelism(1) } 3. Sink自定义Sink 需求:socket发送数据过来,把String类型转换成对象,然后把Java对象保存到MySQL数据库中数据库建表 create table student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(id) ); 继承RichSinkFunction,T是你想要写入对象的类型重写open/close:生命周期方法重写invoke:每条记录执行一次 public class SinkToMySQL extends RichSinkFunction { Connection connection; PreparedStatement preparedStatement; private Connection getConnection() { Connection conn = null; try { String url = "jdbc:mysql://localhost:3306/test"; conn = DriverManager.getConnection(url, "root", "00000000"); } catch (Exception e) { e.printStackTrace(); } return conn; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into Student(id, name, age) values (?, ?, ?)"; preparedStatement = connection.prepareStatement(sql); } // 每条记录插入时调用一次 public void invoke(Student value, Context context) throws Exception { // 为前面的占位符赋值 preparedStatement.setInt(1, value.getId()); preparedStatement.setString(2, value.getName()); preparedStatement.setInt(3, value.getAge()); preparedStatement.executeUpdate(); } @Override public void close() throws Exception { if(connection != null) { try { connection.close(); } catch(Exception e) { e.printStackTrace(); } connection = null; } } } 开发测试方法 public class JavaCustomSinkToMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.socketTextStream("localhost", 7777); SingleOutputStreamOperator studentStream = source.map(new MapFunction() { @Override public Student map(String value) throws Exception { System.out.println(value); String[] splits = value.split(","); Student stu = new Student(); stu.setId(Integer.parseInt(splits[0])); stu.setName(splits[1]); stu.setAge(Integer.parseInt(splits[2])); return stu; } }); studentStream.addSink(new SinkToMySQL()); env.execute("JavaCustomSinkToMySQL"); } } 6. Flink Table API & SQL编程你不能要求业务使用人员都会编程,因此一个好的框架都会提供Table API方便人们的使用,人们可以通过SQL来完成自己的任务。 1. 什么是Flink关系型API大家可以关注我的微信公众号一起学习进步。 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |