一步步解决spring |
您所在的位置:网站首页 › kafka消息丢失 › 一步步解决spring |
kafka的高性能、流式响应、对大数据支持是有目共睹的,在我们内部项目product-search有使用CDC的解决方案,而kafka成为该方案的输出源,此时各个业务方只需要去订阅消费即可。 1. 问题描述有一天有开发人员找到我:我的consumer端消费kafka为何有时候可以消费到数据,有时候不行?dev环境我自己试了是没有问题的,而test环境就不行,不行的时候系统也没有任何报错信息,why? 2. 分析症状说实话,每当遇到这样的问题(连个error都没有),内心是相当的囧,能怎么办?只能立刻马上冷静下来,然后去现场寻找蛛丝马迹。尴尬的时刻到了,了解下来故障现场确实没有留下什么迹象!但还是有些可疑点 3. 去伪求真其实就是排除可疑点,巧合的事情就是最大的可能性。接下来就要按套路来解决问题: 1. kafka server是否真的数据? 很容易确认(借助一些kafka监控),基本可以排除 2. kafka client版本问题,使用方法问题,导致无法消费? Spring for Apache Kafka Version Spring Integration for Apache Kafka Version kafka-clients 2.2.x 3.1.x 2.0.0, 2.1.0 2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0 2.0.x 3.0.x 0.11.0.x, 1.0.x 1.3.x 2.3.x 0.11.0.x, 1.0.x 1.2.x 2.2.x 0.10.2.x 1.1.x 2.1.x 0.10.0.x, 0.10.1.x 1.0.x 2.0.x 0.9.x.x N/A* 1.3.x 0.8.2.2 org.springframework.kafka spring-kafka 1.1.1.RELEASEapplication.yml(见下面) spring: profiles: default kafka: bootstrap-servers: 192.168.1.183:9092,192.168.1.184:9092,192.168.1.182:9092 consumer: group-id: opa-consumer-group auto-offset-reset: earliest enable-auto-commit: false auto-commit-interval: 100 @Component public class KafkaReceiver { private static final String CDC_TOPIC = "BEEHIVE_XKDATA"; private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class); @KafkaListener(topics = {CDC_TOPIC}) public boolean consumer(ConsumerRecord record) { Optional kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); //todo。。。略 } return true; } }首先,版本是没有问题的,kafka driver可以直接去官方网站去确认, 用法(配置和代码),也是官方Samples,我擦这么简单的不可能有问题,基本上此项也可以排除。 3. kafka client可能被别的consumer消费掉? 这个无法识别,需要对源码有一定了解的情况下才可以确认,懂源码不是让你改源码,更多是是让你埋日志(如果它有debug日志就打开,如果不够就把源码拉下来放自己项目中,加日志)。
基本上已经可以确认有问题!我发现:被bean A消费了,这个bean A(它继承了KafkaReceiver),bean A被别的类库扫描到了spring ioc中去了,那这又会有什么问题呢?后续源码会讲到,它不能这么做 4. 读源码1. 要对kafka-driver有一个基本的认识,它是poll模型 2. 对spring-boot有一定了解的都知道,入口在spring.factories(spring-boot-autoconfigure),找到启动类org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,\ 3.看代码 KafkaProperties配置类KafkaTemplate生产者生产消息用的,主要有send()ProducerListener生产者监听接口,主要有onSuccess()、onError()。默认实现LoggingProducerListener,它打印错误消息ConsumerFactory主要是createKafkaConsumer()ProducerFactory主要是createProducer()ConcurrentKafkaListenerContainerFactoryConfigurer配置ContainerProperties(包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器)ConcurrentKafkaListenerContainerFactory创建createContainerInstanceKafkaBootstrapConfigurationKafkaListenerAnnotationBeanPostProcessor:处理各种注解@KafkaListener,它是核心,创建都是它驱动的 KafkaListenerEndpointRegistry:它实现SmartLifecycle(start、stop),掌管MessageListenerContainer的start和stop 4.1 MessageListener接口实现onMessage方法,去消费接收到的消息。两种方案: MessageListener 消费完消息后自动提交offset(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。AcknowledgingMessageListener 消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险MessagingMessageListenerAdapter的实现有:RecordMessagingMessageListenerAdapter、BatchMessagingMessageListenerAdapter,他们是由ConcurrentKafkaListenerContainerFactory创建的。 它继承自抽象类AbstractMessageListenerContainer,核心方法doStart()可以创建多个consumer(KafkaMessageListenerContainer),实例化方式ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory,ContainerProperties containerProperties)。 4.3 执行流程上面讲过,每个consumer都有一个这样的实例,核心方法是start(),其实KafkaMessageListenerContainer通过内部类ListenerConsumer和提交监听消费者任务。 1.定义消费者订阅topic或者指定分区 2.设置监听器,支持4种: 1)BatchAcknowledgingMessageListener批量需确认消息监听器 2)AcknowledgingMessageListener需确认消息监听器 3)BatchMessageListener批量消息监听器 4)MessageListener消息监听器(用的最多,一次消费一条) 提交监听消费者任务(ListenerConsumer),返回Future并赋值。这里我们看一下任务Runnable接口的run方法,分两种情况: 1.如果自定义了分区,没必要再平衡分配分区了,直接回调 2.未指定分区,进入自旋消费 ListenerInvoker.startInvoker()触发onMessage(), 1.如果用户自定义了分区且非自动提交,那么开启异步线程执行ListenerInvoker任务。 2.未指定分区,进入自旋 1)拉取消费记录 ConsumerRecords records = this.consumer.poll(this.containerProperties.getPollTimeout()); 2)如果设置了自动提交,直接在当前线程执行 invokeListener(records); 3)否则发送消息进缓存队列 sendToListener(records) |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |