【kafka专栏】spring kafka处理反序列化异常(毒丸)及消费过程异常

您所在的位置:网站首页 kafka消费异常的处理 【kafka专栏】spring kafka处理反序列化异常(毒丸)及消费过程异常

【kafka专栏】spring kafka处理反序列化异常(毒丸)及消费过程异常

2024-07-02 23:51| 来源: 网络整理| 查看: 265

一、 处理毒丸(Poison Pill)消息

什么情况下可能会造成毒丸(Poison Pill)问题?,我们举一个例子:

某主题之前对应的数据结构一直是User对象(JSON序列化),某天由于程序修改错误,一不小心向该主题发送了若干条字符串消息这些字符串消息无法被反序列化为JSON,出现毒丸(Poison Pill)现象,Consumer会卡在“反序列化失败-重试-反序列化失败”的死循环中,无法再处理后续消息。

当然造成毒丸问题可不是只有上面这一种可能,我们该如何处理呢?要知道kafka是不支持删除数据的。使用ErrorHandlingDeserializer处理反序列化失败,在application.yaml 中配置ErrorHandlingDeserializer反序列化器。 在这里插入图片描述

将Consumer的key-deserializer 和 value-deserializer 都配置为 org.springframework.kafka.support.serializer.ErrorHandlingDeserializer 并委任具体的Key和Value反序列化器: spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

spring: kafka: consumer: auto-offset-reset: earliest key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring.json.trusted.packages: '*' spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

在Key或Value反序列化失败时,先有delegate代理配置的反序列化器进行反序列化。如果反序列化失败,ErrorHandlingDeserializer 可以确保毒丸(Poison Pill)消息被处理掉并记录日志,Consumer offeset可以向前移动,使得Consumer可以继续处理后续的消息。

二、数据消费过程异常处理类

除了再反序列化过程中出现异常,还有可能我们的消费者程序处理数据过程中出现异常,同样有全局的异常处理机制可以使用。 实现KafkaListenerErrorHandler接口对监听器出现的异常进行处理。

@Component public class MyErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message message, ListenerExecutionFailedException exception) { return null; } @Override public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { //do someting return null; } }

配置使用方法如下

@KafkaListener(errorHandler="myErrorHandler") public void readMsg(@Payload ConsumerRecord consumerRecord) { //所有的异常全部对外抛出,不要处理,由myErrorHandler统一处理 }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3