Kafka

您所在的位置:网站首页 Kafka订阅java实现 Kafka

Kafka

2023-08-18 17:44| 来源: 网络整理| 查看: 265

原文网址:Kafka--延迟队列--使用/实现/原理_IT利刃出鞘的博客-CSDN博客

简介

本文介绍Kafka如何使用延迟队列的功能。

Kafka是很常用的消息队列,但Kafka本身是没有延迟队列功能的(RabbitMQ、RocketMQ有延迟队列功能)。本文介绍如何手动给Kafka添加延迟消息的功能。

虽然Kafka内部有时间轮,支持延时操作,例如:延迟生产、延迟拉取以及延迟删除,但这是Kafka自己内部使用的,用户无法将其作为延迟队列来使用。

本内容也是Java后端面试常问的问题 。

方案描述 方案概述

kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻被收到的,因此需要想一个办法,让消息延迟发送出去。

方案如下:

第1步:发送延迟消息时不直接发送到目标topic,而是发送到一个用于处理延迟消息的topic,例如delay-minutes-1第2步:写一段代码拉取delay-minutes-1中的消息,将满足条件的消息发送到真正的目标主题里。 如何延迟发送

延迟消息发出去之后,代码程序就会立刻收到延迟消息,要如何处理才能让延迟消息等待一段时间才发送到真正的topic里面?

不能用sleep

有同学会说很简单嘛,在程序收到消息后判断若条件不满足,就调用sleep方法,过一段时间再进行下一个循环拉取消息。但这样是不行的,原因如下:

在轮询kafka拉取消息的时候,kafka会返回由max.poll.records配置指定的一批消息,当程序不能在max.poll.interval.ms配置的期望时间内处理这些消息的话,kafka就会认为这个消费者已经挂了,会进行rebalance,同时你这个消费者就无法再拉取到任何消息了。

举个例子:当你需要一个24小时的延迟消息队列,在代码里面写下了Thread.sleep(1000*60*60*24);,为了不发生rebalance,你把max.poll.interval.ms 也改成了1000*60*60*24,这个时候你或许会感觉到一丝丝的怪异,我是谁?我在哪?我为什么要写出来这样的代码?

上边只是部分内容,为便于维护,本文已迁移到此地址:Kafka延迟队列的实现方式 - 自学精灵



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3