实战 kafka connector 与 debezium mysql |
您所在的位置:网站首页 › mysql8023卸载 › 实战 kafka connector 与 debezium mysql |
本文已参与「新人创作礼」活动,一起开启掘金创作之路。 kafka connectorKafka Connect 是一个可扩展、可靠的在Kafka和其他系统之间流传输的数据工具。它可以通过connectors(连接器)简单、快速的将大集合数据导入和导出kafka。 1. 安装配置 1.1 下载并配置kafka我们使用的是kafka_2.13-2.8.0.tgz 您可以自行下载 解压到目录 D:\soft\kafka_2.13-2.8.0 这是我的根目录 修改 D:\soft\kafka_2.13-2.8.0\config 文件夹以下文件内容 zookeeper.properties 需要修改的地方: dataDir=D:/soft/kafka_2.13-2.8.0/data/zookeeper 复制代码server.properties 需要修改的地方: log.dirs=D:/soft/kafka_2.13-2.8.0/data/kafka-logs 复制代码在D:\soft\kafka_2.13-2.8.0\bin\windows 下我们新建一个 a.bat 文件内容如下 start zookeeper-server-start.bat ../../config/zookeeper.properties rem -E 如果sleep命令不可用可以用 ping -n 10 127.0.0.1>nul sleep 5 start kafka-server-start.bat ../../config/server.properties 复制代码这个命令会先启动 zookeeper 等zookeeper启动5秒后 再启动 kafka 1.2 下载图形管理界面 kafka-map官方 kafka-map: 一个美观简洁且强大的kafka web管理工具。 下载地址 github.com/dushixiang/… 解压到 D:\soft\kafka_2.13-2.8.0\kafka-map 在 D:\soft\kafka_2.13-2.8.0\kafka-map 新建 startKafkaMap.bat 内容如下: java -jar kafka-map.jar 复制代码 1.3 启动验证启动 a.bat 启动 startKafkaMap.bat 访问 http://localhost:8080/ ,账户和密码默认都是 admin
添加集群 我们先运行一下官方提供的一个例子,这个例子中会把server.properties 用FileStreamSource 读取然后保存到kafka 队列 然后用FileStreamSink 消费队列并保存到 server.properties1 中 2.1 配置connectorD:\soft\kafka_2.13-2.8.0\config\connect-distributed.properties bootstrap.servers=localhost:9092 复制代码 2.2 启动 connector在 D:\soft\kafka_2.13-2.8.0\bin\windows 目录下先创建一个aConnect.bat 内容如下: start zookeeper-server-start.bat ../../config/zookeeper.properties sleep 5 start kafka-server-start.bat ../../config/server.properties sleep 5 start connect-distributed.bat ../../config/connect-distributed.properties 复制代码 2.3 检查环境发送 GET 请求到 http://localhost:8083/ 查看状态 查看所有插件 http://localhost:8083/connector-plugins FileStreamSource 会把 "D:/soft/kafka_2.13-2.8.0/config/server.properties" 这个文件逐行读取后保存到 "kafka-config-topic" 这个队列中 http://localhost:8083/connectors { "name":"load-kafka-config", "config":{ "connector.class":"FileStreamSource", "file":"D:/soft/kafka_2.13-2.8.0/config/server.properties", "topic":"kafka-config-topic" } } 复制代码接口返回结果截图如下 查看状态 http://localhost:8083/connectors/load-kafka-config/status 在kafka-map中 点击箭头所指内容 查看topic 内容 FileStreamSink 会消费 "kafka-config-topic" 这个队列并把 内容保存到 "D:/soft/kafka_2.13-2.8.0/config/server.properties1" http://localhost:8083/connectors { "name":"dump-kafka-config", "config":{ "connector.class":"FileStreamSink", "file":"D:/soft/kafka_2.13-2.8.0/config/server.properties1", "topics":"kafka-config-topic" } } 复制代码接口返回结果如下; 运行完成后会发现 D:\soft\kafka_2.13-2.8.0\config 目录下 多了一个server.properties1 文件,而且文件内容和 server.properties 一样 http://localhost:8083/connectors DELETE http://localhost:8083/connectors/load-kafka-config DELETE http://localhost:8083/connectors/dump-kafka-config 查看 活跃的connetor,发现结果已经为空 在这个例子中 我们会用 debezium mysql connector 监控mysql binlog 并把变更内容保存到kafka 队列中 3.1 mysql 准备为了简单专门用于复制的用户我们就不创建了,我们直接用root 用户 必须配置以下参数 [mysqld] server-id = 8023 port=8023 log-bin=D:/soft/mysql/mysql8023/data1/8023/binlog/mysql-bin transaction-isolation= READ-COMMITTED binlog_format = ROW gtid_mode =ON enforce_gtid_consistency = ON default-time-zone = '+8:00' 复制代码还有一些可选的优化参数请参考官方文档 debezium.io/documentati… 准备好后就可以启动mysql了,在此我们用的mysql版本为8.0.29 准备一个测试库和一个测试表 CREATE DATABASE `test1` ; CREATE TABLE `demo` ( `id` INT(10) NOT NULL, `name` VARCHAR(50) NULL DEFAULT NULL , PRIMARY KEY (`id`) USING BTREE ); -- 插入测试数据 INSERT INTO `demo` (`id`, `name`) VALUES (111, '222'); 复制代码 3.2 debezium 准备下载 debezium 集成kafka connect 插件 repo1.maven.org/maven2/io/d… 或者查找自己的需要的版本 repo1.maven.org/maven2/io/d… 把 文件debezium-connector-mysql-2.0.0.Alpha1-plugin.tar.gz 解压到 D:\soft\kafka_2.13-2.8.0\plugin 在 connect-distributed.properties 配置如下参数,注意路径不要包含 debezium-connector-mysql plugin.path=D:/soft/kafka_2.13-2.8.0/plugin/ 复制代码添加完成后需要重启kafka 和 connect 查看启动日志是否已经加载插件 [2022-05-11 16:09:48,408] INFO Registered loader: PluginClassLoader{pluginLocation=file:/D:/soft/kafka_2.13-2.8.0/plugin/debezium-connector-mysql/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-05-11 16:09:48,410] INFO Added plugin 'io.debezium.connector.mysql.MySqlConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198) 复制代码查看可用的插件 发送 POST 给 http://localhost:8083/connectors 请求正文为 connector 配置参数 { "name": "conn-8023", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "8023", "database.user": "root", "database.password": "123456", "database.server.id": "8023", "database.server.name": "local8023", "database.include.list": "test1", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.local8023", "include.schema.changes": "true" } } 复制代码参考图片 最新版本 debezium-core-2.0.0.Final 出现了错误 2022-11-23 11:31:30,326] INFO [conn-8023|task-0] No previous offsets found (io.debezium.connector.common.BaseSourceTask:339) [2022-11-23 11:31:30,329] ERROR [conn-8023|task-0] The 'schema.history.internal.kafka.topic' value is invalid: A value is required (io.debezium.storage.kafka.history.KafkaSchemaHistory:1906) [2022-11-23 11:31:30,330] ERROR [conn-8023|task-0] The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required (io.debezium.storage.kafka.history.KafkaSchemaHistory:1906) 复制代码原因是: 多了一个参数 "topic.prefix" 且 以下两个参数换了名称 "schema.history.internal.kafka.topic": "dbhistory.local8023", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092" { "name": "conn-8023", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "8023", "database.user": "root", "database.password": "123456", "database.server.id": "8023", "database.server.name": "local8023", "include.schema.changes": "true", "database.include.list": "test", "topic.prefix": "test_", "schema.history.internal.kafka.topic": "dbhistory.local8023", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092" } } 复制代码返回的响应如下 { "name": "conn-8023", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "8023", "database.user": "root", "database.password": "123456", "database.server.id": "8023", "database.server.name": "local8023", "include.schema.changes": "true", "database.include.list": "test", "topic.prefix": "test_", "schema.history.internal.kafka.topic": "dbhistory.local8023", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", "name": "conn-8023" }, "tasks": [], "type": "source" } 复制代码查询状态 http://localhost:8083/connectors/conn-8023/status 返回 { "name" : "conn-8023", "connector" : { "state" : "RUNNING", "worker_id" : "172.19.64.1:8083" }, "tasks" : [{ "id" : 0, "state" : "RUNNING", "worker_id" : "172.19.64.1:8083" } ], "type" : "source" } 复制代码 3.4 查看运行状态用kafka-map 查看数据 查看 local8023.test1.demo 的数据 { "schema": { ... }, "payload": { "before": null, "after": { "id": 111, "name": "222" }, "source": { ... }, "op": "r", "ts_ms": 1652258099933, "transaction": null } } 复制代码再执行一些sql测试一下 -- 插入测试数据 INSERT INTO `demo` (`id`, `name`) VALUES (222, '新数据222'); update demo set name='111修改后' where id=111; DELETE FROM demo WHERE id=222; 复制代码插入事件 "payload": { "before": null, "after": { "id": 222, "name": "新数据222" }, "source": { ... }, "op": "c", "ts_ms": 1652258788884, "transaction": null } 复制代码修改事件 "payload": { "before": { "id": 111, "name": "222" }, "after": { "id": 111, "name": "111修改后" }, "source": { ... }, "op": "u", "ts_ms": 1652258788887, "transaction": null } 复制代码删除事件 "payload": { "before": { "id": 222, "name": "新数据222" }, "after": null, "source": { ... }, "op": "d", "ts_ms": 1652258985267, "transaction": null } 复制代码删除后的墓碑事件 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |