Springboot集成Kafka完成消息回调,错误处理,消息拦截,批量处理

您所在的位置:网站首页 load回调函数错误处理 Springboot集成Kafka完成消息回调,错误处理,消息拦截,批量处理

Springboot集成Kafka完成消息回调,错误处理,消息拦截,批量处理

2024-07-12 16:49| 来源: 网络整理| 查看: 265

这篇文章主要是分享我做消息回调,错误处理,消息拦截,批量处理的一些代码,希望大家给我指出不足之处。

消息回调

这个简单,你只需要实现一个接口:ProducerListener

@Component public class KafkaSendResultHandler implements ProducerListener { @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { //成功时你的处理 } @Override public void onError(ProducerRecord producerRecord, Exception exception) { //失败时你的处理 } } 监听时的错误处理

这个也很简单,自己写一个方法写上你的业务处理,并且在监听时将其配置就好,如果是批量处理时出错,最好有一个单独的出错处理,否则可能会打很多无用日志。

@Component public class DefaultConsumerAwareListenerErrorHandler { @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, e, consumer) -> { //错误处理 return msg; }; } } 消息拦截

实现消息拦截首先实现RecordFilterStrategy接口,在接口中写上自己的过滤规则,true为舍弃,false为保留,但是只是这样的话,对于代码的拓展性是不太好的,因此我们可以通过接口回调来实现我们的自定义消息拦截规则:

@Component public class DefaultRecordFilterStrategy implements RecordFilterStrategy { 自己定义的接口,可通过实现该接口来实现自定义过滤规则来过滤规则。 CustomFilter customFilter; @Override public final boolean filter(ConsumerRecord consumerRecord) { //先判断幂等性,幂等性判断消息是否为重发 /** * 幂等为true,返回true */ if(判断幂等){ return true; } //再判断自定义规则 return customFilter.rule(consumerRecord); } } 批量处理

批量处理的话,需要实现关闭消息的自动应答,因此需要另建一个工厂: KafkaConsumerConfig.java

@Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(batchConsumerConfigs())); factory.setAckDiscarded(true); factory.setRecordFilterStrategy(defaultRecordFilterStrategy); //设置并发量,小于或等于Topic的分区数 factory.setConcurrency(maxPollRecords); //设置为批量监听 factory.setBatchListener(true); return factory; } //批量消费配置 @Bean public Map batchConsumerConfigs() { Map props = new HashMap(); if(bootstrapServers.equals("aliyun")){ bootstrapServers = CommonParam.aliyunServer; }else if(bootstrapServers.equals("dev")){ bootstrapServers = CommonParam.DEVServer; }else{ bootstrapServers = CommonParam.aliyunServer; } //自动提交的频率 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); //连接地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //是否开启自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); //一次拉取消息数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //连接超时时间 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); //键反序列化方式 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); //值反序列化方式 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }

Listener.java

@KafkaListener(topics = "#{kafkaBatchTopicName}", groupId = "#{topicGroupId}", containerFactory = "batchContainerFactory", errorHandler = "batchConsumerAwareErrorHandler") public void batchListen(List list) { for (String s : list) { Message message = new GsonBuilder().create().fromJson(s, Message.class); } }

希望大家可以指出我的不足之处,共同进步[手动加油]



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3