canal+rabbitmq解决mysql与redis缓存数据一致性问题

您所在的位置:网站首页 怎么解决redis与数据库不一致问题 canal+rabbitmq解决mysql与redis缓存数据一致性问题

canal+rabbitmq解决mysql与redis缓存数据一致性问题

2023-11-01 17:32| 来源: 网络整理| 查看: 265

文章目录 1 mysql1.1 开启 MySQL的binlog1.2 重启mysql1.3 查看binlog是否已被开启1.4 修改密码策略1.5 新建canal用户并授权 2 rabbitmq2.1 拉取rabbitmq镜像2.2 运行rabbitmq镜像2.3 进入Rabbitmq Management 3 canal3.1 下载canal3.2 创建解压目录并解压3.3 修改配置文件3.3.1 conf/canal.properties3.3.2 conf/example/instance.properties 3.4 启动canal 4 Spring Boot集成rabbitmq4.1 在pom.xml中添加maven依赖4.2 yml文件4.3 RabbitConfig配置文件4.4 CanalMessage.java4.5 RabbitmqListener.java

1 mysql 1.1 开启 MySQL的binlog vi /etc/my.cnf log-bin=mysql-bin #开启 binlog binlog-format=ROW #选择 ROW 模式 server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

ROW:模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。 STATEMENT:模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况; MIX:模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

1.2 重启mysql systemctl restart mysqld 1.3 查看binlog是否已被开启 SHOW VARIABLES LIKE 'log_bin';

在这里插入图片描述

1.4 修改密码策略 set global validate_password_policy=LOW; set global validate_password_length=5; 1.5 新建canal用户并授权 DROP USER 'canal'@'%'; CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'; FLUSH PRIVILEGES; 2 rabbitmq

因为使用Docker来安装rabbitmq比较方便,所以本文选用Docker进行安装,未安装Docker的请移步Docker从零基础入门到使用。

2.1 拉取rabbitmq镜像 docker pull rabbitmq:3.9.16-management 2.2 运行rabbitmq镜像 docker run -d --name rabbitmq-test --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.9.16-management 2.3 进入Rabbitmq Management

在浏览器地址栏中输入 ip:15672,默认Username和Password均为guest 在这里插入图片描述

3 canal 3.1 下载canal

下载地址 在这里插入图片描述 下载之后通过XFTP或WinSCP上传到centos

3.2 创建解压目录并解压 创建解压目录 mkdir /tmp/canal 解压 tar -zxvf canal.deployer-1.1.5.tar.gz -C /tmp/canal/ 3.3 修改配置文件 3.3.1 conf/canal.properties vi /tmp/canal/conf/canal.properties canal.serverMode = rabbitMQ #设置服务器模式为rabbitMQ rabbitmq.host =127.0.0.1 #ip rabbitmq.virtual.host = / #虚拟主机 rabbitmq.exchange = mysql #交换机名称 rabbitmq.username = guest #用户名 rabbitmq.password =guest #密码 rabbitmq.deliveryMode = direct #交换机类型

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

3.3.2 conf/example/instance.properties vi /tmp/canal/conf/example/instance.properties ## mysql serverId 不能与mysql的server_id一样 canal.instance.mysql.slaveId = 1234 #mysql数据库ip:port canal.instance.master.address = 127.0.0.1:3306 #rabbitmq中exchange与queue进行绑定的路由键 canal.mq.topic=mysql-binlog #mysql数据库账号密码 canal.instance.dbUsername = canal.instance.dbPassword = 3.4 启动canal 进入canal启动目录 cd /tmp/canal/bin

假如服务器内存小,则修改启动文件startup.sh的jvm参数,否则会出现canal无法启动问题,或者是运行着出现com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException

vi startup.sh

在这里插入图片描述

启动canal ./startup.sh 4 Spring Boot集成rabbitmq 4.1 在pom.xml中添加maven依赖 org.springframework.boot spring-boot-starter-amqp 4.2 yml文件 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirms: true # 开启confirm模式,确保消息成功发送到交换器 listener: type: simple # 设置容器类型 simple: default-requeue-rejected: false # basicReject或basicNack后不重新入队,使其进入死信队列 acknowledge-mode: manual # 选择使用手动ack,不使用自动ack retry: enabled: true # 开启消息消费失败重试 max-attempts: 5 # 重试次数 initial-interval: 3000 # 重试时间间隔 4.3 RabbitConfig配置文件 import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(getClass()); @Bean public Exchange exchange() { // 创建一个Direct Exchange,设置为持久化,不自动删除 return new DirectExchange("mysql", true, false); } @Bean public Exchange deadLetterExchange() { // 死信Exchange return new DirectExchange("dead.letter.exchange", true, false); } @Bean public Queue queue() { /** * durable=true 持久化queue的元数据 * exclusive = false 队列不独占,允许多个消费者访问 * autoDelete = false 当最后一个消费者断开连接之后队列是否自动被删除 */ Map args = new HashMap(2); // 配置当前队列绑定的死信交换器 args.put("x-dead-letter-exchange", "dead.letter.exchange"); // 配置当前队列的死信队列路由key,如果不设置默认为当前队列的路由key args.put("x-dead-letter-routing-key", "dead.letter.routing.key"); return new Queue("binlog", true, false, false, args); } @Bean public Queue deadLetterQueue() { // 死信Queue return new Queue("dead.letter.queue", true, false, false); } @Bean public Binding binding() { // 将上面的mysql Exchange与binlog Queue以"mysql-binlog"为路由键进行绑定,无参数 return BindingBuilder .bind(queue()) .to(exchange()) .with("mysql-binlog") .noargs(); } @Bean public Binding deadLetterBinding() { // 绑定死信Queue与死信Exchange return BindingBuilder .bind(deadLetterQueue()) .to(deadLetterExchange()) .with("dead.letter.routing.key") .noargs(); } @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 开启强制委托模式 rabbitTemplate.setMandatory(true); // ack=true表示Exchange接收到了消息 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("消息已发送到Exchange"); } else { logger.error("消息未能发送到Exchange,{}", cause); } } ); // 当消息发送给Exchange后,Exchange路由到Queue失败时会执行ReturnCallBack rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> logger.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message, replyCode, replyText, exchange, routingKey) ); return rabbitTemplate; } } 4.4 CanalMessage.java import lombok.Data; import java.util.List; import java.util.Map; @Data public class CanalMessage { /** * 更新后的数据 */ private List data; /** * 数据库名 */ private String database; /** * binlog executeTime, 执行耗时 */ private long es; /** * id */ private int id; /** * 标识是否是ddl语句,比如create table/drop table */ private boolean isDdl; /** * 更新前的有变更的列的数据 */ private List old; /** * 主键字段名 */ private List pkNames; /** * ddl/query的sql语句 */ private String sql; /** * 表名 */ private String table; /** * dml build timeStamp */ private long ts; /** * 事件类型:INSERT/UPDATE/DELETE */ private String type; } 4.5 RabbitmqListener.java import com.alibaba.fastjson.JSON; import com.company.springboot.canal.CanalMessage; import com.company.springboot.sys.entity.User; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.Random; @Component public class RabbitmqListener { @Resource private RedisTemplate redisTemplate; private final Logger logger = LoggerFactory.getLogger(getClass()); @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "binlog"), exchange = @Exchange(value = "mysql"))) public void businessQueue(@Payload byte[] message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { try { // canal发送到rabbitmq的消息默认为二进制字节流,无法看懂,所以将二进制字节流转换为String类型 String realMessage = new String(message, StandardCharsets.UTF_8); // 将String转换为对象类型 CanalMessage canalMessage = JSON.parseObject(realMessage, CanalMessage.class); // 只针对test数据库中的user表 if ("test".equals(canalMessage.getDatabase()) && "user".equals(canalMessage.getTable())) { if ("UPDATE".equals(canalMessage.getType()) || "INSERT".equals(canalMessage.getType())) { // userList不能直接等于canalMessage.getData(),否则会出现类型无法转换问题 List userList = JSON.parseArray(JSON.parseObject(realMessage).getString("data"), User.class); for (User user : userList) { logger.info(user.toString()); redisTemplate.opsForValue().set("user::" + user.getId(), user, Duration.ofSeconds(60 * 60 + new Random().nextInt(60 * 10))); } } else if ("DELETE".equals(canalMessage.getType())) { List userList = JSON.parseArray(JSON.parseObject(realMessage).getString("data"), User.class); for (User user : userList) { redisTemplate.delete("user::" + user.getId()); } } } // 手动ack,确认消息已被消费 channel.basicAck(deliveryTag, false); } catch (Exception e) { // requeue=false 表示被拒绝的消息进入死信队列 channel.basicNack(deliveryTag, false, false); e.printStackTrace(); } } @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "dead.letter.queue"), exchange = @Exchange(value = "dead.letter.exchange"))) public void deadLetterQueue(@Payload byte[] message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { logger.info("死信队列业务逻辑"); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3