Spring Kafka 消息重试和死信队列

您所在的位置:网站首页 兴和餐饮 Spring Kafka 消息重试和死信队列

Spring Kafka 消息重试和死信队列

2023-10-20 08:23| 来源: 网络整理| 查看: 265

Spring-kafka内部封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。而且可以设置重试达到多少次后,让消息进入预定好的Topic。也就是死信队列里。

demo @Component @EnableScheduling public class DemoListener { private static final Logger logger = LoggerFactory.getLogger(DemoListener.class); @Autowired private ConsumerFactory consumerFactory; @Autowired private KafkaTemplate kafkaTemplate; @Bean public ConcurrentKafkaListenerContainerFactory containerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); // 最大重试次数3次 factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3)); return factory; } @KafkaListener(id = "group", topics = "testTopic", containerFactory = "containerFactory") public void listen(ConsumerRecord record, Acknowledgment ack) throws Exception { Optional kafkaMessage = Optional.ofNullable(record.value()); if (!kafkaMessage.isPresent()) { throw new Exception("监听到的消息为空值"); } logger.info("topicID: " + record.topic()); logger.info("recordValue: " + record.value()); try { /*业务逻辑*/ ack.acknowledge(); } catch (Exception e) { throw new Exception("消息异常,进入死信队列..."); } } @KafkaListener(id = "dltGroup", topics = "testTopic.DLT") public void dltListen(String input) { logger.info("Received from DLT: " + input); } }

demo中,监听testTopic主题的监听器在处理业务逻辑时如果触发运行时异常,监听器会重新尝试三次调用,当到达最大的重试次数后。消息就会被丢掉重试死信队列里面去。死信队列的Topic的规则是,业务Topic名字+“.DLT”。如上面业务Topic的name为“testTopic”,那么对应的死信队列的Topic就是“testTopic.DLT”



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3