RocketMQ消费失败如何处理?如何保证消费消息的幂等性?

您所在的位置:网站首页 rocketmq清空消息 RocketMQ消费失败如何处理?如何保证消费消息的幂等性?

RocketMQ消费失败如何处理?如何保证消费消息的幂等性?

2022-05-14 11:44| 来源: 网络整理| 查看: 265

 

文章目录 1. 消息消费失败如何处理? 2. 如何保证消费消息的幂等性?

 

 

1. 消息消费失败如何处理?

当消费者从Broker获取到消息后会进行消费,并返回消费状态。如下代码所示

//broker推消息到Consumer consumer.registerMessageListener(new MessageListenerConcurrently() { //调用consumeMessage方法进行消费 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); //返回消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });

正常消费时,返回CONSUME_SUCCESS,如果消费失败,会触发重试机制

        1.1 消费失败会触发消息重试机制

        首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于普通的消息,当消费者消费消息失败后,你可以在消息监听器接口的实现中设置返回状态达到消息重试的结果。可以有三种设置方式:

消费成功:返回CONSUME_SUCCESS,如上代码所示 消费失败: 方式①:返回RECONSUME_LATER,消息将重试 方式②:返回null,消息将重试 方式③:直接抛出异常, 消息将重试 public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //处理消息 doConsumeMessage(message); //方式1:返回 Action.ReconsumeLater,消息将重试 return Action.ReconsumeLater; //方式2:返回 null,消息将重试 return null; //方式3:直接抛出异常, 消息将重试 throw new RuntimeException("Consumer Message exceotion"); } }

如果消费失败不希望重试,可以直接返回Action.CommitMessage。

public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage; return Action.CommitMessage; } //消息处理正常,直接返回 Action.CommitMessage; return Action.CommitMessage; } }

        1.2 如何进行消息重试?

重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的重试队列中。RocketMQ消费失败如何处理?如何保证消费消息的幂等性?_消息队列MQ        然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间与延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

延时级别如下:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

注意:在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。但是在4.7.1版本中,每次重试MessageId都会重建。

        然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。这个配置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。

        1.3 多次重试后仍然失败,消息进入死信队列

        当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的名称是%DLQ%+ConsumGroup,如下图RocketMQ消费失败如何处理?如何保证消费消息的幂等性?_消息队列MQ_02死信队列的特征:

死信队列中的消息不会再被消费者正常消费。 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

        通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

注意:

默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2,表示禁读。这个权限有三种

禁读:2 禁写:4 可读可写:6

所以需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台配置)。

 

2. 如何保证消费消息的幂等性?

在MQ系统中,对于消息幂等有三种实现语义:

at most once 最多一次:每条消息最多只会被消费一次。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。 at least once 至少一次:每条消息至少会被消费一次。RocketMQ也有同步发送、事务消息等很多方式能够保证。 exactly once 刚刚好一次:每条消息都只会确定的消费一次,这个是MQ中最理想也是最难保证的一种,RocketMQ只能保证at least once,保证不了exactly once,所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性。

        2.1 幂等性存在的必要性

        在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,造成重复消费。这个重复简单可以概括为以下情况:

produce发送到Broker时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者producer宕机,导致服务端Broker 对 producer应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 Broker投递消息到Consumer时消息重复 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启) 当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发Rebalance重平衡机制,此时消费者可能会收到重复消息。

        2.2 幂等性解决方案

        从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

        但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。这个id可以使用分布式中间件redis,zookeeper等去生成。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3