flink学习笔记1

您所在的位置:网站首页 flink输出到多个mysql表 flink学习笔记1

flink学习笔记1

2023-08-23 07:04| 来源: 网络整理| 查看: 265

概述

Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。支持实时流处理和批处理。其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

从部署上讲,Flink支持local模式、集群模式(standalone集群或者Yarn集群)、云端部署。

在一般的流处理程序中,会有三种处理语义

at most once : 至多一次,表示一条消息不管后续处理成功与否只会被消费处理一次,那么就存在数据丢失可能exactly once : 精确一次,表示一条消息从其消费到后续的处理成功,只会发生一次at least once :至少一次,表示一条消息从消费到后续的处理成功,可能会发生多次

flink的checkpoint机制能把程序处理的中间状态保存下来,当程序失败可以从最新的checkpoint中恢复,通过checkpoint的机制,flink可以实现精确一次和至少一次的语义。

flink官网的第一个程序:计算单词出现的次数 public class WindowWordCount { public static void main(String[] args) throws Exception { //创建flink流执行的环境,获取环境对象 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //添加一个输入流,这里是让程序监控本机9999端口,可以在本机安装nc程序,然后在控制台执行nc -lk 9999 DataStreamSource source = env.socketTextStream("localhost", 9999); //读入localhost:9999输入的数据,格式为单词,然后根据splitter类进行单词拆分 SingleOutputStreamOperator flatOperator = source.flatMap(new Splitter()); //将拆分好的单词 按照单词进行分组,注意这里的keyby参数可以为数字,表示flatOperator流的某一个字段 //也可以是字符串,表示flatOperator流表示的对象的字段名称 KeyedStream keyby = flatOperator.keyBy(0); //执行window操作,Windows包括计数窗口和时间窗口2大类,具体见窗口章节的说明,这里是一个时间窗口,窗口时间为5秒 //表示将keyby流中获得的数据缓存起来,缓存5秒后再一起执行 WindowedStream window = keyby.timeWindow(Time.seconds(5)); //将Windows中的数据进行运算,求和操作,sum参数和keyby参数类似,可以是序号也可以是具体某个字段 SingleOutputStreamOperator sum = window.sum(1); //将求和结果进行输出,这里是打印到屏幕,也可以输入文件和保存到数据库 sum.print(); //执行操作 env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction { @Override public void flatMap(String sentence, Collector out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2(word, 1)); } } } } flink程序的步骤 创建flink执行环境,并设置相关全局配置设置数据输入方式,一般对输入叫做dataSource 如从kafka读取、从某个端口读取将读取到的数据进行计算,每一个计算逻辑一般叫做算子,如map、flatMap、keyby、window、sum等将计算结果进行保存,一般对输出叫做dataSink 如保存到数据库或者打印到屏幕调用执行环境的执行操作 env.execute()

注意:上述步骤都是程序启动时就开始执行了,相当于是一个剧本,先确定好剧本,当数据到来时,从DataSource中获取数据,然后依次执行算子进行数据计算,最后将数据输出到datasink中

DataSource

flink提供了如下几种最基本的DataSource:

readTextFile、readFile:从文件中读取数据socketTextStream:从网络上读取数据fromCollection、fromElements、fromParallelCollection:从集合中读取数据addSource:用户自定义数据源,如kafka DataSink:

flink提供了如下几种基本的datasink:

writeAsText:当成文本写入外部文件writeAsCsv:写入Excel文件print:输出到屏幕writeUsingOutputFormat:输出到外部流中writeToSocket :输出到网络的某个socket上addSink:用户自定义输出,如数据库 kafka作为flink数据源 依赖包maven引入 org.apache.flink flink-connector-kafka_2.11 1.10.0 代码片段 public class KafkaSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置kafka相关参数 当然 这是最基础的配置 还可以添加其他的配置 Properties props = new Properties(); props.setProperty("bootstrap.servers",""); props.setProperty("group.id", "groupId"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //如果禁用检查点,可以设置自动提交偏移量,true表示自动提交偏移量,且每隔1000提交一次 props.put("enable.auto.commit", true); props.put("auto.commit.interval.ms", 1000); //开启检查点,每1秒提交一次,且需要事先有且只有一次的消息消费 env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE); //创建kafka的DataSource 使用addsource方法 //反序列化方案 SimpleStringSchema 将字节流转换成简单的字符串 DataStreamSource kafkaSource = env.addSource(new FlinkKafkaConsumer010("mytopic", new SimpleStringSchema(), props)); //直接输出数据 kafkaSource.print(); //执行任务 env.execute(); } } 检查点知识 何为检查点:

就是将程序运行过程中的状态数据以快照的形式周期性的保存起来(后一次状态数据会覆盖前一次的状态数据),用于程序重启时恢复相关中间状态,每一次状态数据的保存即触发了一次检查点

如果启用检查点,props中配置的自动提交偏移量的配置将无效启用检查点,并设置检查点提交周期,最后设置检查点模式,精确一次还是最少一次启用检查点后,将在检查点提交后再提交kafka偏移量,以保证kafka中的偏移量和检查点中的偏移量一致当程序重启后,会先从检查点中获取偏移量,如果检查点中没有偏移量,则再从kafka中获取偏移量

检查点数据保存到哪里呢?默认保存到job manager的内存中,也可以在集群中进行配置,可以保存到文件系统中、HDFS中。

MySQL作为程序输出 代码片段如下: public class MysqlSink { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置kafka相关参数 当然 这是最基础的配置 还可以添加其他的配置 Properties props = new Properties(); props.setProperty("bootstrap.servers",""); props.setProperty("group.id", "groupId"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建kafka的DataSource 使用addsource方法 //反序列化方案 SimpleStringSchema 将字节流转换成简单的字符串 DataStreamSource kafkaSource = env.addSource(new FlinkKafkaConsumer010("mytopic", new SimpleStringSchema(), props)); //将字符串转换成对象 SingleOutputStreamOperator operator = kafkaSource.flatMap(new RichFlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { JavaBeanPo result = JSONObject.parseObject(value, JavaBeanPo.class); out.collect(Arrays.asList(result)); } }); //将结果保存到MySQL数据库中 operator.addSink(new RichSinkFunction() { protected Connection connection = null; protected PreparedStatement ps = null; DruidDataSource dataSource = null; //只执行一次,用于创建数据库连接 @Override public void open(Configuration parameters) throws Exception { dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://localhost:3306/test?autoReconnect=true&useUnicode=true&useAffectedRows=true&characterEncoding=utf8"); dataSource.setUsername(""); dataSource.setPassword(""); try { connection = dataSource.getConnection(); } catch (SQLException e) { e.printStackTrace(); } } //只执行一次,用于关闭数据库连接 @Override public void close() throws Exception { if (dataSource != null) { dataSource.close(); } if(connection!=null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } //执行数据入库操作 @Override public void invoke(List results, Context context) throws Exception { //构建sql String sql = "INSERT INTO tablename (colum1,colum2,...column) " + " values (?,?,?,?,?,?) on duplicate key " + " update colum1= values(colum1),column=values(column) "; ps = connection.prepareStatement(sql); for(JavaBeanPo record : results){ ps.setObject(1, record.getXX()); ps.setObject(n, record.getXX()); ps.addBatch(); } ps.executeBatch(); } }); //执行任务 env.execute(); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3