如何有选择地从 AMQP(RabbitMQ)队列中删除消息?-开发者之家

您所在的位置:网站首页 rabbitmq批量删除队列 如何有选择地从 AMQP(RabbitMQ)队列中删除消息?-开发者之家

如何有选择地从 AMQP(RabbitMQ)队列中删除消息?-开发者之家

2023-03-29 18:07| 来源: 网络整理| 查看: 265


我想有选择地从 AMQP 队列中删除消息,甚至不读取它们.

I'd like to selectively delete messages from an AMQP queue without even reading them.


发送方希望基于 X 类型的新信息到达的事实使 X 类型的消息过期.因为订阅者很可能还没有消费 X 类型的最新消息,所以发布者应该删除以前的 X 类型消息并将最新的消息放入队列中.整个操作对订阅者应该是透明的——事实上他应该使用像 STOMP 这样简单的东西来获取消息.

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

如何使用 AMQP 做到这一点?或者也许在另一个消息传递协议中更方便?

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?


I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

发布者客户端将使用 Ruby,但实际上,只要我发现协议中的方法,我就会处理任何语言.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.


您目前无法在 RabbitMQ(或更一般地,在 AMQP 中)自动执行此操作.但是,这里有一个简单的解决方法.

You cannot currently do this in RabbitMQ (or more generally, in AMQP) automatically. But, here's an easy workaround.

假设您要发送三种类型的消息:Xs、Ys 和 Zs.如果我正确理解您的问题,当 X 消息到达时,您希望代理忘记所有其他尚未传递的 X 消息.

Let's say you want to send three types of messages: Xs, Ys and Zs. If I understand your question correctly, when an X message arrives, you want the broker to forget all other X messages that haven't been delivered.

这在 RabbitMQ 中相当容易做到:

This is fairly easy to do in RabbitMQ:

生产者声明了三个队列:X、Y 和 Z(它们会自动绑定到默认交换器,并将它们的名称作为路由键,这正是我们想要的),发布消息时,生产者首先清除相关队列(因此​​,如果发布 X 消息,则首先清除 X 队列);这有效地删除了过时的消息,消费者只是从它想要的队列中消费(X 表示 X 条消息,Y 表示 Y 条消息,等等);从它的角度来看,它只需要执行 basic.get 即可获得下一条相关消息.


This implies a race condition when two producers send the same type of message at the about the same time. The result is that its possible for the a queue to have two (or more) messages at the same time, but since the number of messages is upper-bounded by the number of producers, and since the superfluous messages are purged on the next publish, this shouldn't be much of a problem.

总而言之,这个解决方案比最优解决方案多了一个步骤,即在发布类型 X 的消息之前清除队列 X.

To summarize, this solution has just one extra step from the optimal solution, namely purge queue X before publishing a message of type X.

如果您在设置此配置时需要任何帮助,寻求建议的最佳地点是 rabbitmq-discuss 邮件列表.

If you need any help setting up this configuration, the perfect place to ask for advice is the rabbitmq-discuss mailing list.




CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3