中间件Canal之Canal简单使用

您所在的位置:网站首页 canal使用 中间件Canal之Canal简单使用

中间件Canal之Canal简单使用

#中间件Canal之Canal简单使用| 来源: 网络整理| 查看: 265

一. 简单介绍

Canal是Java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal主要支持了MySQL的Binlog解析,解析完成后才能利用Canal Client来处理获得的相关数据。

二. MySQL的Binlog 2.1. Binlog是什么?

MySQL的二进制可以说MySQL最重要的日志了,它记录了所有DDL和DML(除了数据查询语句)语句,以事件形式记录,还包括所有执行的消耗时间,MySQL的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

MySQL Replication在Master端开启Binlog,Master把它的二进制日志传递给Slaves来达到Master-Slave数据一致性的目的; 数据恢复,通过使用MySQL Binlog工具来恢复数据

二进制日志包括两类文件:二进制索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除数据查询语句)语句事件。

2.2. Binlog的分类

MySQL Binlog的格式有三种,分别是statement、mixed、row。在配置文件中可以选择配置binlog_format=statement|mixed|row。三种格式的区别:

statement:语句级,binlog会记录每一次执行写操作的语句。相对row模式节省空间,但是可能产生不一致性,比如“update tt set create_time=now()”,如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同;优点是节省空间;缺点就是可能造成数据不一致; row:行级,binlog会记录每次操作后每行记录的变化;优点是保持数据的绝对一致性(因为不管SQL是什么,引用了什么函数,它只记录执行后的结果);缺点是占用空间大。 mixed:statement的升级版,一定程度上解决了,因为一些情况而造成的statement模式不一致问题,默认还是statement,在某些情况下譬如:当函数包含UUID()时、包含auto_increment字段的表被更新时、执行insert delayed语句时、用duf时,会按照row的方式进行处理;优点是节省空间,同时兼顾了一定的一致性;缺点是还是存在极个别的情况依旧会造成数据不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

所有从上面比较来看Canal想做监控分析,选择row格式比较合适。

三. 工作原理 3.1. MySQL主从复制过程

简单过程如下:

Master主库将改变记录,写到二进制日志(Binary Log)中; Slave从库向MySQL Master发送dump协议,将Master主库的binary log events拷贝到它的中继日记(relay log); Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库; 3.2. Canal的工作原理

Canal的工作原理很简单,就是把自己伪装成Slave,假装从Master复制数据。

四. 使用场景

在Canal使用场景如下:

原始场景:阿里Otter中间件的一部分,Otter是阿里用于异地数据库之间的同步框架,Canal是其中的一部分; 更新缓冲 抓取业务表的新增变化数据,用于制作实时统计 五. MySQL环境配置

关于MySQL环境的搭建可以参看上一篇文章:MySQL之主从复制集群搭建

六. 安装Canal

在canal中下载:github.com/alibaba/can…

canal-download.png

在服务器中创建一个canal的工作目录,解压到此目录:

tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C canal-1.17 复制代码

解压之后我们只需要关注conf和bin这两个目录中的文件就可以:

canal-dir.png

注意:canal的通用配置,canal端口默认就是11111,修改canal的输出model,默认tcp。

在conf下面的example是表示一个实例,每个实例下面都有一个instance.properties;如果需要多个实例处理不同的MySQL数据,只需要拷贝出多个example,并对其重新命名;最后修改canal.properties中的canal.destinations=xxx,xxx1,xxx2。

接着配置example下的配置文件:

################################################# ## mysql serverId , v1.0.26+ will autoGen # 因为canal是模拟了一个slave所以这里需要配置slaveId canal.instance.mysql.slaveId=3 # enable gtid use true/false canal.instance.gtidon=false # position info # 配置master的地址 canal.instance.master.address=192.168.31.174:33306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # username/password # 配置master的账号密码 canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false 复制代码

接着启动Canal:

bin/startup.sh # 启动 bin/stop.sh # 关闭 复制代码 七. 监控测试

接着创建一个maven项目,并添加相关依赖:

com.alibaba.otter canal.client 1.1.6 com.alibaba.otter canal.protocol 1.1.6 复制代码

接着看一下如何连接Canal并监控库表数据变换:

import com.alibaba.fastjson2.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; /** * @author: Eternity.麒麟 * @description: canal简单使用 * @date: 2023/2/3 17:32 * @version: 1.0 */ public class CanalClient { public static void main(String[] args) throws InvalidProtocolBufferException { // 连接canal服务器 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("192.168.31.174", 11111), "example", "", ""); System.out.println("开始监听......"); while (true) { // 连接 connector.connect(); // 订阅的库和表 connector.subscribe("cluster_db.*"); // 一次拉取的数据量 Message message = connector.get(10); List entries = message.getEntries(); if (!entries.isEmpty()) { for (CanalEntry.Entry entry : entries) { // 表名 String tableName = entry.getHeader().getTableName(); // 类型 CanalEntry.EntryType type = entry.getEntryType(); switch (type) { // 数据变更 case ROWDATA -> { // 获取数据 ByteString storeValue = entry.getStoreValue(); // 解析数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); // 获取行数据 List datasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : datasList) { JSONObject beforeData = new JSONObject(); // 变更之前数据 rowData.getBeforeColumnsList().forEach(item -> beforeData.put(item.getName(), item.getValue())); JSONObject afterData = new JSONObject(); // 变更之后数据 rowData.getAfterColumnsList().forEach(item -> afterData.put(item.getName(), item.getValue())); System.out.println("表名: " + tableName + ", 事件类型: " + eventType + ", 变更之前: " + beforeData + ", 变更之后: " + afterData); } } default -> System.out.println("当前操作类型为:" + type); } } } } } } 复制代码

启动之后,我们在表中进行增删改查,对应的canal监听结果如下:

canal-listen.png



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3