FLINK CDC近期使用小结

您所在的位置:网站首页 CDC数据库开启 FLINK CDC近期使用小结

FLINK CDC近期使用小结

2023-06-10 03:23| 来源: 网络整理| 查看: 265

1、Flink cdc是什么?

        CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术.

2、Flink cdc 安装部署 2.1、安装包要求 flink版本: flink-1.13.0-bin-scala_2.11.tgz java版本:jdk-8u271-linux-x64.tar.gz mysql版本:mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz 2.2、 部署安装 2.2.1、安装过程 1、解压flink cdc安装包 tar -zxvf flink-1.13.0-bin-scala_2.11.tgz 2、安装Java以及Mysql 百度自行解决,注意配置环境变量⚠️ 3、flink cdc中java配置 cd flink-1.13.0 vim conf/flink-conf.yaml 添加配置:env.java.home=/usr/local/java/jdk1.8.0_271 ---- 版本号需要对应 4、上传驱动包,放在flink1.13.0/lib 目录下 注意⚠️:以上过程仅使用mysql数据源,若涉及其他数据源需要对应的驱动包 flink-connector-jdbc_2.11-1.13.6.jar flink-sql-connector-mysql-cdc-2.1.0.jar mysql-connector-java-8.0.27.jar 5、启动服务 注意⚠️:服务启动后可通过8081端口查看是否正常 /bin/start-cluster.sh -- 启动集群,如果安装包或者配置改变,需要重启服务 /bin/sql-client.sh --启动SQL客户端 2.2.2、注意事项 1、数据库用户权限,最好不用root角色,但是需要相应的权限。如果发生权限报错,可百度或者评论问我 2、数据库需要开启bin-log,并且是row模式 3、各个软件包版本、驱动版本对应关系,否则会启动报错 4、java版本需要在1.7以上,mysql版本测试适配是5.7 2.2.3、flink cdc 配置

进入/conf       

vim  flink-conf

#任务存储,建议调大一点 jobmanager.memory.process.size: 3200m #任务存储,建议调大一点 taskmanager.memory.process.size: 3200m #任务槽数:不同任务所需槽,根据实际情况配置 #默认是1 - 如果配置1,导致无法执行2个或者2个以上任务 taskmanager.numberOfTaskSlots: 50 #任务并行度:涉及到读取效率,默认是1 parallelism.default: 5 #checkponit、savepoint存储方式,这种能降低内存使用,面临大数据的问题;做测试用默认就可以 #注意:需要在/home/admin 下建立 /state/ckp /state/savepoint的两个文件夹 state.backend: rocksdb state.checkpoints.dir: file:///home/admin/state/ckp state.savepoints.dir: file:///home/admin/state/savepoint 3、读写测试 3.1、建Mysql表 首先需要在mysql中建立相应的映射表 如:static_pay_order_his CREATE TABLE `static_pay_order_his` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', `all_amount` bigint(20) comment '历史成交总金额', `all_count` bigint(20) comment '历史成交总数', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=106 DEFAULT CHARSET=utf8 COMMENT='支付平台统计值 --- 历史' 建立:pay_order表 CREATE TABLE `pay_order` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=106 DEFAULT CHARSET=utf8 COMMENT='支付订单' 3.2、建cdc映射表 #设置ck快照时间,时间越长越能降低存储消耗 SET execution.checkpointing.interval = 3000s; #建立表单表 CREATE TABLE pay_order ( id bigint NOT NULL, amount bigint, state STRING, isv_no STRING, create_time timestamp, update_time timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'scan.incremental.snapshot.enabled' = 'true', ---如果配置了false,则会锁表 'connector' = 'mysql-cdc', ---cdc是source,读表 'hostname' = 'localhost', 'port' = '3306', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '', 'server-id' = '5502' ); ---交易总金额 CREATE TABLE static_pay_order_his ( id bigint, all_amount bigint, all_count bigint, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', --- jdbc则是sink,写表 'url' = '', 'username' = '', 'password' = '', 'table-name' = 'static_pay_order_his' ); ---写sink表 insert into static_pay_order_his (id, all_amount, all_count) select cast(replace(cast(current_date as string), '-', '') as bigint) as id, sum(amount) as all_amount, count(*) as all_count from pay_order where isv_no='62f9e7d9e4b0cb4057de15ae' and cast(create_time as string) >= '2023-06-01' ; 3.3、运行状态查看

 4、应用场景

1、实时大屏(双11大屏)

2、数据湖 & 实时数仓(olap)



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3