消息队列的坑(重复消费、顺序消费、消息丢失)

您所在的位置:网站首页 最常用的消息队列有哪些方法呢 消息队列的坑(重复消费、顺序消费、消息丢失)

消息队列的坑(重复消费、顺序消费、消息丢失)

#消息队列的坑(重复消费、顺序消费、消息丢失)| 来源: 网络整理| 查看: 265

一.常用的一些消息队列

1.rabbitmq erlang语言开发,时效性最高

2.rocketmq 吞吐量高,时效性高,实现了事务消息,但在大数据方面需要自己写代码支持

3.kafka 超高的吞吐量,消息较少时可能会有延迟(kafka是堆积一波消息后发送)

二.消息队列模型

1.常用的是topic订阅发布模型

发布订阅又有两种模式

集群消费方式 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务广播消费方式 一条消息被多个Consumer消费,即使这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费

同一个topic下,不同的consumer均能收到消息,同一个consumer(group)则只能有一个消费者收到消息【默认集群模式下,如果是广播模式则都能消费到】 在这里插入图片描述

2.点对点模型

生产者发送的消息,已有一个消费者都收到 在这里插入图片描述

三.启动的时候从哪里消费

当新实例启动的时候,PushConsumer 会拿到本消费组 broker 已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次 Pull 请求。

如果这个消费进度在 Broker 并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUMEFROMLAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息

CONSUMEFROMFIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在 broker 的)全部消费一遍

CONSUMEFROMTIMESTAMP:从某个时间点开始消费,和 setConsumeTimestamp() 配合使用,默认是半个小时以前

四.topic分区,如何保证消息顺序消费

在这里插入图片描述 rocketmq、kafka等mq均有分区的概念。

分区是为了为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。broker的引入就是解决水平扩展问题的一个方案。

生产者发送的时候可以指定一个key选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)

例如:电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

既保证业务的顺序,同时又能保证业务的高性能。

五.消息队列发送过程,如果保证消息队列不丢失消息

生产者端重试

向broker发送消息时,如果由于网络抖动等原因导致消息发送失败,可以设置失败重试次数让消息重发

消费者端重试

由于网络等原因导致消息没法从broker发送到消费者端,此时MQ会重试直到发送成功(集群模式)

确保消费成功再ack,关闭自动ack设置手动ack。如果消费者端在执行后续消息处理后因为网络原因队列未收到ack,为了保证消息是肯定被至少消费成功一次,RocketMQ 会进行重试,把这批消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预

自动ack消费者收到消息就会就会ack更新位移手动ack消费者消费完毕成功才会ack更新位移

在这里插入图片描述

六.如何解决重复消费

常用的消息队列都能确保消息到达,但是不能保证唯一性,所以可能存在重复数据

生产者成功发送消息给队列时,队列会返回ack给生产者,但是当网络出现问题,队列成功收到消息,但是ack出现问题。生产者一般会重发消息,所以会导致队列中存在多条重复消息。

此外,如果消费者事务提交,但是返回ack网络出现问题,导致队列未收到ack,那么队列会重复发消息给消费者

保证接口的幂等性

乐观锁

唯一索引

记录每条被消费的消息的状态

七.消息队列实现分布式事务

事务消息 在这里插入图片描述

发送方先向 mq 发送一条 prepare 消息,如果 prepare 消息发送失败,则直接取消操作 如果消息发送成功,则执行本地事务如果本地事务执行成功,则想 mq 发送一条 confirm 消息,如果发送失败,则发送回滚消息订阅方定期消费 mq 中的 confirm 消息,执行本地事务,并发送 ack 消息。如果 B 系统中的本地事务失败,会一直不断重试,如果是业务失败,会向 A 系统发起回滚请求mq会提供一个消息回查的功能,会定期轮询那些未确认的 prepared 消息检查本地事务,如果该 prepare 消息本地事务处理成功,则重新发送 confirm 消息,否则直接回滚该消息 八.消息积压

原因:

消费者消费消息的速度比不上生产者发送消息的速度

解决办法:

1 如果有慢sql慢逻辑等情况,优先修复问题。慢逻辑是否能异步执行

2 增加单节点线程数,或者增加消费者机节点横向扩展)

注:使用多线程来处理消费消息,因为如果某线程异常了不会影响主线程,到最后主线程消费者已ack给队列,消息已被删除,数据就无法恢复了

参考文章: RocketMQ——角色与术语详解 RocketMQ官方文档 分布式消息队列RocketMQ&Kafka – 消息的“顺序消费”-- 一个看似简单的复杂问题 RocketMQ & Kafka 消息消费与消息重试 分布式事务,这一篇就够了



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3