Springboot集成Kafka完成消息回调,错误处理,消息拦截,批量处理 |
您所在的位置:网站首页 › load回调函数错误处理 › Springboot集成Kafka完成消息回调,错误处理,消息拦截,批量处理 |
这篇文章主要是分享我做消息回调,错误处理,消息拦截,批量处理的一些代码,希望大家给我指出不足之处。 消息回调这个简单,你只需要实现一个接口:ProducerListener @Component public class KafkaSendResultHandler implements ProducerListener { @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { //成功时你的处理 } @Override public void onError(ProducerRecord producerRecord, Exception exception) { //失败时你的处理 } } 监听时的错误处理这个也很简单,自己写一个方法写上你的业务处理,并且在监听时将其配置就好,如果是批量处理时出错,最好有一个单独的出错处理,否则可能会打很多无用日志。 @Component public class DefaultConsumerAwareListenerErrorHandler { @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, e, consumer) -> { //错误处理 return msg; }; } } 消息拦截实现消息拦截首先实现RecordFilterStrategy接口,在接口中写上自己的过滤规则,true为舍弃,false为保留,但是只是这样的话,对于代码的拓展性是不太好的,因此我们可以通过接口回调来实现我们的自定义消息拦截规则: @Component public class DefaultRecordFilterStrategy implements RecordFilterStrategy { 自己定义的接口,可通过实现该接口来实现自定义过滤规则来过滤规则。 CustomFilter customFilter; @Override public final boolean filter(ConsumerRecord consumerRecord) { //先判断幂等性,幂等性判断消息是否为重发 /** * 幂等为true,返回true */ if(判断幂等){ return true; } //再判断自定义规则 return customFilter.rule(consumerRecord); } } 批量处理批量处理的话,需要实现关闭消息的自动应答,因此需要另建一个工厂: KafkaConsumerConfig.java @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(batchConsumerConfigs())); factory.setAckDiscarded(true); factory.setRecordFilterStrategy(defaultRecordFilterStrategy); //设置并发量,小于或等于Topic的分区数 factory.setConcurrency(maxPollRecords); //设置为批量监听 factory.setBatchListener(true); return factory; } //批量消费配置 @Bean public Map batchConsumerConfigs() { Map props = new HashMap(); if(bootstrapServers.equals("aliyun")){ bootstrapServers = CommonParam.aliyunServer; }else if(bootstrapServers.equals("dev")){ bootstrapServers = CommonParam.DEVServer; }else{ bootstrapServers = CommonParam.aliyunServer; } //自动提交的频率 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); //连接地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //是否开启自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); //一次拉取消息数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //连接超时时间 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); //键反序列化方式 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); //值反序列化方式 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }Listener.java @KafkaListener(topics = "#{kafkaBatchTopicName}", groupId = "#{topicGroupId}", containerFactory = "batchContainerFactory", errorHandler = "batchConsumerAwareErrorHandler") public void batchListen(List list) { for (String s : list) { Message message = new GsonBuilder().create().fromJson(s, Message.class); } }希望大家可以指出我的不足之处,共同进步[手动加油] |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |