canal 监听同步指定数据库,所有表

您所在的位置:网站首页 监听数据库表的变化 canal 监听同步指定数据库,所有表

canal 监听同步指定数据库,所有表

2023-11-14 03:06| 来源: 网络整理| 查看: 265

canal 监听同步指定数据库,所有表

因为工作需求,需要用到数据库同步,又从网上找了一些发现都有些问题,所以自己弄好之后写一篇总结,及配置步骤吧

先将 MySQL配置成 bin-log模式给MySQL配置canal用户下载 canal工具(在这里我用的是:canal.deployer-1.1.5-SNAPSHOT)我会把工具上传到我的资源(免费的)配置instance.properties 配置文件编写java api 开始配置 配置MySQL bin-log模式 先 window+R 唤出 输入 services.msc 然后点击确定 在这里插入图片描述 找到MySQL服务右键属性,找到MySQL地址,因为我这配置了默认的my.ini文件,没配置应该是一个MySQL的地址找到里面的my.ini文件 在这里插入图片描述

找到my.ini文件进入这是我自己的地址 添加配置 log-bin=mysql-bin binlog-format=ROW

注意server-id之前有没有,没有就添加一个

server-id=1 然后保存,在刚才唤出的服务里重启MySQL服务 在这里插入图片描述

给MySQL配置canal用户

CREATE USER canal IDENTIFIED BY ‘canal’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’; FLUSH PRIVILEGES;

下载canal 程序包 canal软件包下载地址 https://download.csdn.net/download/Angzush/12894366在这里插入图片描述

4.配置 instance.properties文件,注意 红框的文件夹名后面配置会用到 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

canal.instance.defaultDatabaseName = 你默认监听的数据库 canal.instance.filter.regex = 正则配置的规则 我这里配置的是 data_resource_update_platform 数据库下所有的表

mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子: 所有表:.* or .\… canal schema下所有表: canal\…* canal下的以canal打头的表:canal\.canal.* canal schema下的一张表:canal.test1 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解 析sql,所以无法准确提取tableName进行过滤)

我当时是这块出了问题,一直监听的是整个MySQL服务器,不是我配置的数据库

最重要部分

canal instance启动时,默认加载instance.properties的canal.instance.filter.regex参数,之后会根据conf/canal/meta.dat文件filter值更新过滤规则。当客户端调用CanalConnector.subscribe(String filter)方法时,instance再次用filter参数更新过滤规则。

所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".\…"),不然等于没设置canal.instance.filter.regex。 如果一定要调用CanalConnector.subscribe(".\…"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。

你需要将 meta.dat 文件中的 filter对应的正则 改成 你配置的那一个我的是:data_resource_update_platform\…* 在这里插入图片描述 在这里插入图片描述

然后再启动 bin下面的 startup.bat 文件 在这里插入图片描述 接着查看日志 这样就启动成功了 在这里插入图片描述

java API 代码 package org.bigdata.framework.utils; import java.net.InetSocketAddress; import java.util.List; import javax.validation.constraints.NotNull; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; /** * @param * @author zcs * @date 2020/09/27 * @description * @return */ public class ClientSample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");//这里的 example 是上文中提到的文件名 int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmtryCount = 1200; while (emptyCount emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(@NotNull List entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(@NotNull List columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3