Flink

您所在的位置:网站首页 hive与mysql的关系 Flink

Flink

2023-05-12 06:27| 来源: 网络整理| 查看: 265

1.概述

使用flink-connector-sqlserver-cdc 2.3.0把数据从SQL Server实时同步到MySQL中。(下面有代码)

2.环境 2.1.SQL Server

要求SqlServer版本为14及以上,也就是SQL Server 2017版。

2.2.MySQL

我使用的MySQL版本为8.0.33,5~8版本的MySQL应该都没有问题。

2.3.flink-connector-sqlserver-cdc

因为我要从SQL Server把数据同步到MySQL,所以只能使用Flink CDC 2.2。因为在Flink CDC 2.2及更高的版本中Flink CDC才支持SQL Server,而且对SQL Server的版本有严格要求。

3.实现 3.1.SqlServer数据库设置 CDC在SQL Server数据库上启用。 SQL Server代理正在运行。 您就是数据库的db_owner。 3.2.SqlServer数据库脚本 --新建数据库 create database zy_erp_cdc; -- 在需要开启CDC的数据库上执行如下脚本 if exists(select 1 from sys.databases where name='zy_erp_cdc' and is_cdc_enabled=0) begin exec sys.sp_cdc_enable_db end -- 查询数据库的CDC开启状态,查询结果为1,表示开启成功。 select is_cdc_enabled from sys.databases where name='zy_erp_cdc'; -- 最好新建文件组,如新建文件组 CDC -- 新建表 create table zy_erp_cdc.dbo.student( id int primary key, name varchar(50), age int, mark int ); -- 在数据表上启动CDC USE zy_erp_cdc GO EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', -- Specifies the schema of the source table. @source_name = 'student', -- Specifies the name of the table that you want to capture. @role_name = NULL, -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the sysadmin or db_owner role also have access to the specified change tables. Set the value of @role_name to NULL, to allow only members in the sysadmin or db_owner to have full access to captured information. @filegroup_name = 'CDC1',-- Specifies the filegroup where SQL Server places the change table for the captured table. The named filegroup must already exist. It is best not to locate change tables in the same filegroup that you use for source tables. @supports_net_changes = 0 GO --测试脚本 select * from zy_erp_cdc.dbo.student; insert into dbo.student values(1,'小黑',18,1); update dbo.student set name = '小白' where id = 1; delete dbo.student where id=1; 复制代码 3.3.MySQL数据库脚本 create database zy_ods; create table zy_ods.student ( id int not null primary key, name varchar(50) null, age int null, mark int null ); 复制代码 3.4.Flink CDC采集到的数据格式 3.4.1.Insert {"before":null,"after":{"id":1,"name":"小黑","age":18,"mark":1},"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209454617,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000968:001b","commit_lsn":"00000028:00000968:001c","event_serial_no":1},"op":"c","ts_ms":1683180664795,"transaction":null} 复制代码 3.4.2.Update {"before":{"id":1,"name":"小黑","age":18,"mark":1},"after":{"id":1,"name":"小白","age":18,"mark":1},"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209589897,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000d00:0002","commit_lsn":"00000028:00000d00:0003","event_serial_no":2},"op":"u","ts_ms":1683180799648,"transaction":null} 复制代码 3.4.3.Delete {"before":{"id":1,"name":"小白","age":18,"mark":1},"after":null,"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209644903,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000d68:0002","commit_lsn":"00000028:00000d68:0005","event_serial_no":1},"op":"d","ts_ms":1683180855132,"transaction":null} 复制代码 3.代码实现 3.1.代码结构

image.png

3.2.pom.xml 4.0.0 lhw.com flink-cdc-demo 1.0-SNAPSHOT 2.11.0 1.14.4 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-clients_2.11 ${flink.version} com.ververica flink-connector-sqlserver-cdc 2.3.0 org.apache.flink flink-connector-jdbc_2.11 1.14.4 log4j log4j 1.2.17 org.slf4j slf4j-nop 1.7.2 org.projectlombok lombok 1.18.26 com.alibaba fastjson 2.0.29 mysql mysql-connector-java 8.0.33 复制代码 3.3.Java代码 package com.lhw.bean; import lombok.Data; import java.util.Map; /** * @ClassName CommonBean * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性验证,通用类 * @Date 2023/5/5 10:43 */ @Data public class CommonBean { private Map before; private Map after; private Map source; private String op; private String ts_ms; private String transaction; } 复制代码 package com.lhw.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.util.Map; import java.util.Properties; /** * @ClassName CollectionConfig * @Author lihongwei * @Version 1.0.0 * @Description 配置文件加载 * @Date 2023/5/5 9:58 */ public class CollectionConfig { private static final Logger LOG = LoggerFactory.getLogger(CollectionConfig.class); public final static Properties config = new Properties(); static { InputStream profile = CollectionConfig.class.getClassLoader().getResourceAsStream("dev/config.properties"); try { config.load(profile); } catch (IOException e) { LOG.info("load profile error!"); } for (Map.Entry kv : config.entrySet()) { LOG.info(kv.getKey()+"="+kv.getValue()); } } // // public static void main(String[] args) { // String property = config.getProperty("mysql.driver"); // System.out.println("------------------"+property); // } } 复制代码 package com.lhw.inter; import com.lhw.bean.CommonBean; import org.apache.flink.streaming.api.datastream.DataStream; /** * @ClassName ProcessDataInterface * @Author lihongwei * @Version 1.0.0 * @Description TODO * @Date 2023/5/5 11:19 */ public interface ProcessDataInterface { void process(DataStream commonData); } 复制代码 package com.lhw.job; import com.lhw.bean.CommonBean; import com.lhw.config.CollectionConfig; import com.lhw.map.CommonBeanMap; import com.lhw.task.StudentTask; import com.ververica.cdc.connectors.sqlserver.SqlServerSource; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.log4j.Logger; /** * @ClassName CollectionStream * @Author lihongwei * @Version 1.0.0 * @Description 主类,处理流程 * @Date 2023/5/4 13:09 */ public class CollectionStream { private static Logger logger = Logger.getLogger(CollectionStream.class); public static void main(String[] args) { DebeziumSourceFunction sourceFunction = SqlServerSource.builder() .hostname(CollectionConfig.config.getProperty("sqlserver.hostname")) .port(Integer.parseInt(CollectionConfig.config.getProperty("sqlserver.port"))) .database(CollectionConfig.config.getProperty("sqlserver.database"))// monitor sqlserver database .tableList(CollectionConfig.config.getProperty("sqlserver.tableList"))// monitor products table .username(CollectionConfig.config.getProperty("sqlserver.username")) .password(CollectionConfig.config.getProperty("sqlserver.password")) .deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //SqlServer 增量业务数据流 DataStreamSource streamSource = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering; // streamSource.print(); // 数据转换,转换成通用类 DataStream commonBeanStream = streamSource.map(new CommonBeanMap()); // 数据过滤,只保留student表的数据 SingleOutputStreamOperator filterDataStream = commonBeanStream.filter( new FilterFunction() { public boolean filter(CommonBean commonBean) throws Exception { return commonBean.getSource().get("table").equals("student"); } } ); // 业务数据处理,学生表 new StudentTask().process(filterDataStream); // 触发执行 try { env.execute(); } catch (Exception e) { logger.error("===自动化脚本执行异常==="); logger.error(e); e.printStackTrace(); } } } 复制代码 package com.lhw.map; import com.alibaba.fastjson.JSON; import com.lhw.bean.CommonBean; import org.apache.flink.api.common.functions.MapFunction; /** * @ClassName CommonBeanMap * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性验证代码,将从SqlServer监听到的数据转换为统一的类型 * @Date 2023/5/5 10:48 */ public class CommonBeanMap implements MapFunction { public CommonBean map(String s) throws Exception { CommonBean commonBean = JSON.parseObject(s, CommonBean.class); return commonBean; } } 复制代码 package com.lhw.map; import com.lhw.bean.CommonBean; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.types.Row; /** * @ClassName StudentMap * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性验证代码,将通用类转换为与业务对应的Row * @Date 2023/5/5 11:47 */ public class StudentMap extends RichMapFunction { public Row map(CommonBean commonBean) throws Exception { Row row; if (commonBean.getOp().equals("c") || commonBean.getOp().equals("u")|| commonBean.getOp().equals("r")) { row = new Row(4); row.setField(0, Integer.parseInt(commonBean.getAfter().get("id"))); row.setField(1, commonBean.getAfter().get("name")); row.setField(2, Integer.parseInt(commonBean.getAfter().get("age"))); row.setField(3, Integer.parseInt(commonBean.getAfter().get("mark"))); }else{ row = new Row(1); row.setField(0, Integer.parseInt(commonBean.getBefore().get("id"))); } return row; } } 复制代码 package com.lhw.sink; import com.lhw.util.DbUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row; import java.sql.Connection; import java.sql.PreparedStatement; /** * @ClassName SinkMysql * @Author lihongwei * @Version 1.0.0 * @Description 写入mysql数据(通用) * @Date 2023/5/5 9:52 */ public class SinkMysql extends RichSinkFunction { String sql; public SinkMysql(String sql){ this.sql=sql; } Connection conn = null; PreparedStatement ps = null; //获取连接 @Override public void open(Configuration parameters) throws Exception { conn = DbUtil.getConnByJdbc(); ps = conn.prepareStatement(sql); } //执行操作 // @Override public void invoke(Row value, Context context) throws Exception { for (int i = 0; i < value.getArity(); i++) { ps.setObject(i+1,value.getField(i)); } //执行插入 ps.executeUpdate(); } //关闭连接 @Override public void close() throws Exception { if (ps != null){ ps.close(); } if (conn!=null){ conn.close(); } } } 复制代码 package com.lhw.task; import com.lhw.bean.CommonBean; import com.lhw.config.CollectionConfig; import com.lhw.inter.ProcessDataInterface; import com.lhw.map.StudentMap; import com.lhw.sink.SinkMysql; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * @ClassName StudentTask * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性测试代码 * @Date 2023/5/5 11:14 */ public class StudentTask implements ProcessDataInterface { public void process(DataStream commonData) { // 新建侧边流分支(删除) //封装删除 final OutputTag deleteOpt = new OutputTag("deleteOpt", TypeInformation.of(CommonBean.class)); //数据分流 SingleOutputStreamOperator processData = commonData.process( new ProcessFunction() { @Override public void processElement(CommonBean commonBean, Context context, Collector collector) throws Exception { if (commonBean.getOp().equals("c") || commonBean.getOp().equals("u") || commonBean.getOp().equals("r")) { //insert or update collector.collect(commonBean); }else { //delete context.output(deleteOpt, commonBean); } } } ); String upsertSql = "replace into %s value(?,?,?,?)"; String deleteSql = "delete from %s where id=?"; //insert,update processData.map(new StudentMap()).addSink(new SinkMysql(String.format(upsertSql, CollectionConfig.config.getProperty("mysql.student.sql.table")))); //delete processData.getSideOutput(deleteOpt).map(new StudentMap()).addSink(new SinkMysql(String.format(deleteSql,CollectionConfig.config.getProperty("mysql.student.sql.table")))); } } 复制代码 package com.lhw.util; import com.lhw.config.CollectionConfig; import java.sql.*; import java.util.*; /** * @ClassName DbUtil * @Author lihongwei * @Version 1.0.0 * @Description 数据库工具类 * @Date 2023/5/5 9:55 */ public class DbUtil { public static Map query(String key, String sql) { Connection connection = null; PreparedStatement preparedStatement = null; ResultSet rs = null; Map table = new HashMap(); String[] keys = key.split(","); try { connection = DbUtil.getConnByJdbc(); preparedStatement = connection.prepareStatement(sql); rs = preparedStatement.executeQuery(); ResultSetMetaData rsm = rs.getMetaData(); int colNum = rsm.getColumnCount(); while (rs.next()) { Map row = new HashMap(); for (int i = 1; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3