MQ中的数据丢失处理办法

您所在的位置:网站首页 xbox数据包丢失怎么办 MQ中的数据丢失处理办法

MQ中的数据丢失处理办法

2024-07-11 10:59| 来源: 网络整理| 查看: 265

MQ 消息队列中的数据丢失处理办法 img 2.丢消息。

这得从java的java.net.SocketException异常说起。简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Software caused connection abort: recv failed错误。

通过抓包得知,ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。

解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

一种是用MQ的事务,但是有个缺点,是阻塞的,影响性能

另一种是用confirm模式(优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可)

​ 方式一:channel.waitForConfirms()普通发送方确认模式;

​ 方式二:channel.waitForConfirmsOrDie()批量确认模式;

​ 方式三:channel.addConfirmListener()异步监听发送方确认模式;另一种是用confirm模式(优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可)

​ 方式一:channel.waitForConfirms()普通发送方确认模式;

​ 方式二:channel.waitForConfirmsOrDie()批量确认模式;

​ 方式三:channel.addConfirmListener()异步监听发送方确认模式;

1.1.1、生产者弄丢了数据

生产者将数据发送到 ActiveMQ的时候,可能因为网络问题等原因,在生成的过程中数据丢失。

解决方案

​ 基于生成者发送消息自己不知道是否发送成功的情况,有如下两种解决方式。

​ 事务机制:可以选择用 activeMQ提供的事务功能,就是生产者发送数据之前开启 activeMQ事务。Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 但是问题来了,如果使用activeMQ事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。

​ 非事务机制(消息持久化,confirm 机制):事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

producer.setDeliveryMode(factory.getDeliveryMode())

所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

1.1.2、ActiveMQ弄丢了数据

当生产者发送消息到ActiveMQ,Active没进行消失持久化时挂掉了,此时ActiveMQ重启的时候发送过来的数据就丢失了。解决方案是要将接收到的消息进行持久化操作。

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

ActiveMQ不设置,默认就是持久的.

虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。

1.1.3、消费者弄丢了数据

​ activeMQ如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

​ 自动重试机制:自动确认,收到消息以后,自动应答并且消费成功了。 如果有异常不会自动应答,并且会重发6次。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

吞吐量:联想到机场吞吐量,表示的是一定时间飞机的起降架次和旅客的运输数量。在系统就表示一定时间内请求获得响应的数量,反应的是系统的处理性能。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3