Canal监控数据库数据变化

您所在的位置:网站首页 监听数据库变更 Canal监控数据库数据变化

Canal监控数据库数据变化

#Canal监控数据库数据变化| 来源: 网络整理| 查看: 265

最近有需求需要监控数据库mysql里某些表的数据变化,来做相应的业务变化,采用了 canal+kafka 的技术架构。官网github指导很全面

原理:

将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有 kafka 和 RocketMQ

安装:

zk

安装简单,就是解压修改配置zoo.cfg, 主要是端口(默认2181),log 和data 目录配置,是否集群根据情况来

tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data dataLogDir=/usr/local/zookeeper/logs clientPort=2181 # 集群添加 server.1=192.168.1.110:2888:3888 server.2=192.168.1.111:2888:3888 server.3=192.168.1.112:2888:3888 #master echo "1">/usr/local/zookeeper/data/myid #slave1 echo "2">/usr/local/zookeeper/data/myid #slave2 echo "3">/usr/local/zookeeper/data/myid

安装后看是否需要添加环境变量,然后测试是否安装成功

 kafka

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz # 下载后解压,主要修改配置文件 server.properties # kafka 是需要依赖zookeeper的,需要配置zk地址 #然后配置监听端口 zookeeper.connect=192.168.1.110:2181 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.1.117:9092 #本机ip

安装后自己先测试一下,生产和消费,如果集成springboot , 这里先简单测试,看是否一切正常,最后再测试canal

canal

官网github 下载压缩包,安装后conf目录内包含

其中 canal.properties 和 example/instance.properties 配置需要修改

instance.properties

# 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # table regex 需要监控的表名,多个逗号隔开 canal.instance.filter.regex=mytest.user # mq config #canal.mq.topic=example # 针对库名或者表名发送动态topic canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=3 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id ################################################# #canal.mq.dynamicTopic 表达式说明 #canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔 # #例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上 #例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上 #例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上 #例子4:test\\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上 #例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值 #为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table # #例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上 #例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下 #例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下 #例子4:testA:test\\.* 指定匹配的表达式,针对匹配的表会发送到testA的topic下 #例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值 #大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力 # #canal.mq.partitionHash 表达式说明 #canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔 # #例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2 #例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id #例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找) #例子4: 匹配规则啥都不写,则默认发到0这个partition上 #例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名 #按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题) #例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列 #注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

canal.properties

# ... # 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = kafka # ... # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:6667 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false

启动后观看日志是否正常,日志文件有两处,

logs/canal/canal.log logs/example/example.log

然后查看kafka 消费情况  得到的消息如下

新增 { "data": [{ "id": "10028", "name": "18676671234", "account": "18676671234", "password": "e10adc3949ba59abbe56e057f20f883e", "status": "Y" }], "database": "dyly_vc", "es": 1563764670000, "id": 6, "isDdl": false, "mysqlType": { "id": "bigint(20)", "name": "varchar(255)", "account": "varchar(255)", "password": "varchar(255)", "status": "varchar(10)" }, "old": null, "pkNames": ["id"], "sql": "", "sqlType": { "id": -5, "name": 12, "account": 12, "password": 12, "status": 12 }, "table": "back_user", "ts": 1563764670842, "type": "INSERT" } 修改 { "data": [{ "id": "10027", "name": "修改名字", "account": "18126239509", "password": "e10adc3949ba59abbe56e057f20f883e", "status": "Y" }], "database": "dyly_vc", "es": 1563764853000, "id": 19, "isDdl": false, "mysqlType": { "id": "bigint(20)", "name": "varchar(255)", "account": "varchar(255)", "password": "varchar(255)", "status": "varchar(10)" }, "old": [{ "name": "测试" }], "pkNames": ["id"], "sql": "", "sqlType": { "id": -5, "name": 12, "account": 12, "password": 12, "status": 12 }, "table": "back_user", "ts": 1563764853305, "type": "UPDATE" } 删除 { "data": [{ "id": "10028", "name": "18676671234", "account": "18676671234", "password": "e10adc3949ba59abbe56e057f20f883e", "status": "Y" }], "database": "dyly_vc", "es": 1563764949000, "id": 27, "isDdl": false, "mysqlType": { "id": "bigint(20)", "name": "varchar(255)", "account": "varchar(255)", "password": "varchar(255)", "status": "varchar(10)" }, "old": null, "pkNames": ["id"], "sql": "", "sqlType": { "id": -5, "name": 12, "account": 12, "password": 12, "status": 12 }, "table": "back_user", "ts": 1563764949610, "type": "DELETE" }

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3