消息幂等

您所在的位置:网站首页 rabbitma 消息幂等

消息幂等

2023-06-21 18:44| 来源: 网络整理| 查看: 265

如果消息重复消费会影响您的业务处理,要对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。

概念

在消息领域,幂等是指Consumer重复消费某条消息时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。

例如,在支付场景下,Consumer消费扣款消息,对一笔订单执行扣款操作,扣款金额为500元。如果因网络不稳定等原因导致扣款消息重复投递,Consumer重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费500元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消息幂等。

适用场景

在互联网应用中,尤其在网络不稳定的情况下,分布式消息服务RabbitMQ的消息有可能会出现重复。如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。

投递时消息重复

消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,分布式消息服务RabbitMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。

负载均衡时消息重复(包括但不限于网络抖动、服务端重启以及Consumer应用重启)

当分布式消息服务RabbitMQ的服务端或客户端重启、扩容或缩容时,会触发Rebalance,此时Consumer可能会收到重复消息。

处理方法

以Message ID为幂等键对消息进行幂等处理的步骤如下:

(1)在数据库中创建一张unique key索引为唯一Message ID的表。

(2)在Producer客户端为每条消息设置唯一Message ID。

设置唯一Message ID的示例代码如下:

AMQP.BasicProperties props =newAMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("ExchangeName","RoutingKey",true, props,("消息发送"+ i).getBytes());

(3)在Consumer客户端根据唯一Message ID对消息进行幂等处理。

根据唯一Message ID进行幂等处理的示例代码如下:

channel.basicConsume(Producer.QueueName, false, "MyConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope env, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1. 获取业务唯一性索引数据。 try{ String messageId = properties.getMessageId(); // Message ID或者其他作为unique key的信息。 // 2. 开启数据库事务。 idempTable.insert(messageId); // 3. 对接收到的消息,进行业务逻辑处理。 // 4. 提交或回滚事务。// 处理成功,则进行ACK,否则不要进行ACK。 channel.basicAck(env.getDeliveryTag(), false); } catch (数据库主键冲突异常 e){ // 重复消息,直接确认掉。 channel.basicAck(env.getDeliveryTag(), false); } } } );


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3