kafka进阶:延时队列

您所在的位置:网站首页 topic的定义 kafka进阶:延时队列

kafka进阶:延时队列

2023-03-06 00:14| 来源: 网络整理| 查看: 265

本文已参与「新人创作礼」活动,一起开启掘金创作之路。

在kafka中要如何实现延时队列呢?很遗憾kafka官方到目前为止并没有给出延时队列的直接实现。也就是说我们需要自己来实现这一部分的功能。

延时队列

实现的原理描述如下:

根据业务需要,自定义几个特殊主题。比如:topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m。分别表示延迟1秒,延迟5秒,延迟5分钟,延迟30分钟。当然具体延迟多少需要根据你自己的业务来设定。但有一点就是延迟的时间间隔是需要固定的。

自己实现一个delay service。可以是java或者其它语言实现。delay service 同时消费上面定义的主题,然后等待延时时间的到达。时间到达后,把消息转发至业务主题。

业务处理正常消费业务主题,处理消息

举个例子,比如火车票订单,如果30分钟内未支付,则自动取消订单。那么用时序图描述延时处理过程就是正面这样的。

image.png

注意事项

实现delay service 里需要注意:

因为consumer是单线程的,所以理论上有多少个延迟主题,就需要创建多少个线程。像上面topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m 这4种主题,就需要4个线程来处理。延迟主题少问题不大,但延迟主题如果比较多的话,还是比较难受的。

需要手动提交偏移量,因为delay service 可能会因为升级或者故障,导致重启,这个时候消息是不能漏的,所以一定要消息已经转发到业务主题后,再提交偏移量,防止漏消息。

重复消息的处理,因为delay service 需要先把消息发送到业务 主题,再提交偏移量,就有可能出现 消息发送到业务主题后,还没来的及发送偏移量,delay service 就因电力故障无法正常服务了,下次重启后,就可能继续发送已经发送过的消息。所以业务上建议是要做到幂等,以实现容错。

重试队列 与 死信队列

我们在 延时队列的基础上实现重试队列就比较简单了,当某个消息处理失败时,就把这个消息发送到 延时队列。等时间到了就会重试处理。如果处理成功,则结束。如果处理失败则重试次数加1,再一次进入延时队列。而如果超过了重试次数,则写入死信队列,作为记录。

这里说的重试队列,死信队列都是概念上的东西,kafka本身并不提供。我们是可以在使用层实现这一类概念。下面的时序图,是一个示例。我们假设有一个订单支付成功了,有积分逻辑需要处理,但重试三次后依然失败了。时序过程描述如下:

image.png

实现示例

我在github上面提交了,我的实现示例,这里简单说明一下。github地址: github.com/junfenglian…

项目中的kafka-delay-queue是核心代码,kafka-delay-service是springboot集成的示例。

这里简单讲一下集成的过程: 文件目录:

├── pom.xml └── src └── main ├── java │   └── cn │   └── genlei │   └── service │   └── Application.java └── resources └── application.yml 复制代码

一共就三个文件。

pom.xml org.springframework.boot spring-boot-starter-web cn.genlei kafka-delay-queue 1.0.0 复制代码

关键就是添加依赖,说明一下,因为 kafka-delay-queue 这个包还没有传到 maven的中心仓库中。所以需要先本地到kafka-delay-queue目录下通过 mvn install编译这个jar包。

application.yml kafka: servers: 127.0.0.1:9092 group-id: kafka-delay-group topics: 5000,topic-delay-5s;60000,topic-delay-1m 复制代码

注意修改 kafka服务器地址,对应你自己的kafka服务器地址。topics的格式是 延时 + 逗号 + 延时主题。多个配置用 分号 分隔。相关的主题需要先手动在kafka上创建。

Application.java package cn.genlei.service; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import cn.genlei.DelayQueue; import java.util.Properties; @SpringBootApplication public class Application implements CommandLineRunner { @Value("${kafka.servers}") String servers; @Value("${kafka.group-id}") String groupId; @Value("${kafka.topics}") String topics; public static void main(String[] args){ SpringApplication.run(Application.class); } @Override public void run(String... args) throws Exception { Properties properties = new Properties(); properties.put("bootstrap.servers",servers); properties.put("group.id", groupId); properties.put("topics", topics); DelayQueue delayQueue = new DelayQueue(properties); delayQueue.start(); } } 复制代码

这里关键的部分就是run方法内的内容。我们创建一个DelayQueue实例,然后调用它的start方法,就会开始延时队列的处理了。

上述代码并没有在生产环境长时间验证过,如果你需要用于生产,请谨慎使用。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3