消息队列的坑(重复消费、顺序消费、消息丢失) |
您所在的位置:网站首页 › 最常用的消息队列有哪些方法呢 › 消息队列的坑(重复消费、顺序消费、消息丢失) |
一.常用的一些消息队列
1.rabbitmq erlang语言开发,时效性最高 2.rocketmq 吞吐量高,时效性高,实现了事务消息,但在大数据方面需要自己写代码支持 3.kafka 超高的吞吐量,消息较少时可能会有延迟(kafka是堆积一波消息后发送) 二.消息队列模型1.常用的是topic订阅发布模型 发布订阅又有两种模式 集群消费方式 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务广播消费方式 一条消息被多个Consumer消费,即使这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费同一个topic下,不同的consumer均能收到消息,同一个consumer(group)则只能有一个消费者收到消息【默认集群模式下,如果是广播模式则都能消费到】 2.点对点模型 生产者发送的消息,已有一个消费者都收到 当新实例启动的时候,PushConsumer 会拿到本消费组 broker 已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次 Pull 请求。 如果这个消费进度在 Broker 并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择: CONSUMEFROMLAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息 CONSUMEFROMFIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在 broker 的)全部消费一遍 CONSUMEFROMTIMESTAMP:从某个时间点开始消费,和 setConsumeTimestamp() 配合使用,默认是半个小时以前 四.topic分区,如何保证消息顺序消费
分区是为了为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。broker的引入就是解决水平扩展问题的一个方案。 生产者发送的时候可以指定一个key选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息) 例如:电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。 既保证业务的顺序,同时又能保证业务的高性能。 五.消息队列发送过程,如果保证消息队列不丢失消息生产者端重试 向broker发送消息时,如果由于网络抖动等原因导致消息发送失败,可以设置失败重试次数让消息重发消费者端重试 由于网络等原因导致消息没法从broker发送到消费者端,此时MQ会重试直到发送成功(集群模式) 确保消费成功再ack,关闭自动ack设置手动ack。如果消费者端在执行后续消息处理后因为网络原因队列未收到ack,为了保证消息是肯定被至少消费成功一次,RocketMQ 会进行重试,把这批消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预 自动ack消费者收到消息就会就会ack更新位移手动ack消费者消费完毕成功才会ack更新位移常用的消息队列都能确保消息到达,但是不能保证唯一性,所以可能存在重复数据 生产者成功发送消息给队列时,队列会返回ack给生产者,但是当网络出现问题,队列成功收到消息,但是ack出现问题。生产者一般会重发消息,所以会导致队列中存在多条重复消息。 此外,如果消费者事务提交,但是返回ack网络出现问题,导致队列未收到ack,那么队列会重复发消息给消费者 保证接口的幂等性 乐观锁 唯一索引 记录每条被消费的消息的状态 七.消息队列实现分布式事务事务消息 原因: 消费者消费消息的速度比不上生产者发送消息的速度 解决办法: 1 如果有慢sql慢逻辑等情况,优先修复问题。慢逻辑是否能异步执行 2 增加单节点线程数,或者增加消费者机节点横向扩展) 注:使用多线程来处理消费消息,因为如果某线程异常了不会影响主线程,到最后主线程消费者已ack给队列,消息已被删除,数据就无法恢复了 参考文章: RocketMQ——角色与术语详解 RocketMQ官方文档 分布式消息队列RocketMQ&Kafka – 消息的“顺序消费”-- 一个看似简单的复杂问题 RocketMQ & Kafka 消息消费与消息重试 分布式事务,这一篇就够了 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |