3、kafka常见问题

您所在的位置:网站首页 kafka超时异常 3、kafka常见问题

3、kafka常见问题

2024-07-12 05:31| 来源: 网络整理| 查看: 265

一、消息丢失或重复【重点】 1、生产者ack策略

ack设置为0:0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。

ack设置为1:1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。

ack设置为all:all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。

2、消费者消息重复或丢失

消费者消息丢失或者重复可能存在下面3种情况。1、生产者消息未幂等,出现消息丢失或者重复的场景。2、消费者消费消息未提交偏移量。 3、消费者提交了偏移量未完全消费消息。

2.1、解决生产者消息重复丢失

kafka版本Kafka>=0.11版本才支持幂等。如果使用版本低于0.11,可以考虑升级kafka或者手工实现幂等。开启生产者幂等是解决生产者消息不丢失不重复,但是无法解决消费端导致的消息重复丢失问题。

retries=Integer.MAX_VALUE  //消息重试次数max.in.flight.requests.per.connection=1 (Kafka >= v0.11 & < v1.1) //生产者消息排队队列大小为1防止乱序acks=allenable.idempotence =  true //开启幂等

2.1.1、Retry【生产重试】

有时候Producer发送Message失败可能并不是因为Broker挂了,可能是因为网络问题,没有连接到Broker等等。这种问题可能在很短暂的时间内就会自动修复,那么在这种情况下,我们希望Producer在发送失败后能重新尝试发送。这里就需要设置retries这个参数,意思就是重试的次数,默认是0次,可以根据实际业务情况设置。 当设置了retries参数大于0后,有可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。

2.1.2、per【生产顺序】

这里又会引出一个参数max.in.flight.requests.per.connection,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。所以刚才那个场景,第一条和第二条Message都在Broker那排队等待确认放行,这时第一条失败了,等重试的第一条Message再来排队时,第二条早都通过进去了,所以排序就乱了。 如果想在设置了retries还要严格控制Message顺序,可以把max.in.flight.requests.per.connection设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。

2.1.3、enable.idempotence【开启幂等】

开启生产者幂等配置。

2.2、解决消费者消息重复丢失 2.2.1、使用kafka手动提交

kafka支持自动提交和手动提交2种模式,我们使用手动提交,明确判断消息已经落入库中,在提交kakfa消息偏移。

二、消息堵塞 1、消费能力不足

加节点法适合对消息顺序不关注的场景。合理评估集群最大TPS,如果发现集群最大消费能力出现不足,要及时扩容节点。注意点:topic的分区数要大于等于消费节点数,否则会出现部分节点不消费。

2、消费能力递减

业务刚开始的时候集群环境都不会出现问题,随着数据规模变大。导致整个系统的反应变慢,导致消费能力减弱。解决办法:业务的性能边际永远要小于压测边际,否则重新评估系统

3、消费报错异常

如果没有做好异常捕获处理,异常可能会中断消费线程,导致消费无法进行导致消息堵塞。解决办法:消费者测试要多场景测试(并发情况下、异常情况下、正常情况下)

4、消费超时异常【重点】 4.1、消费者超时

kafka消费者采用批量拉取的方式,一次拉取一批记录来消费,如果消费者线程消费超时会导致整批消息的回滚,从而导致已经消费过的数据再消费一遍,消费者不幂等会出大问题,这也是为啥要强调使用消费队列一定要考虑幂等性的原因。细思极恐的是:这次在规定的时间内消费不完,你能保证下次就能消费完吗?有可能进入拉取->超时->回滚->拉取的无限循环中.  原因:集群以为消费线程挂了,触发了rebanlance(这一批已经给别的消费者线程消费了)。当前消费者线程业务逻辑执行完了再去同步游标报错了,没有提交成功,这就导致了两个消费者线程把同一批消息消息了两遍。

4.2、关于消费超时涉及如下配置

max.poll.interval.ms   拉取时间间隔    默认值:300s    每次拉取的记录必须在该时间内消费完max.poll.records   每次拉取条数    默认值:500条    这个条数一定要结合业务背景合理设置fetch.max.wait.ms  每次拉取最大等待时间        时间达到或者消息大小谁先满足条件都触发,没有消息但时间达到返回空消息体fetch.min.bytes  每次拉取最小字节数        时间达到或者消息大小谁先满足条件都触发heartbeat.interval.ms  向协调器发送心跳的时间间隔    默认值:3s    建议不超过session.timeout.ms的1/3session.timeout.ms  心跳超时时间    默认值:30s    配置太大会导致真死消费者检测太慢

4.3、max.poll.interval.ms 和session.timeout.ms区别

KIP-62(kafka规范)前只有session.timeout.ms参数 KIP-62后不通过poll()方法发送心跳,而是后台另起一个心跳线程,这就允许单次poll处理更长时间。不会因为单次处理超时假死引发不必要的rebanlancemax.poll.interval.ms 检测消费者处理线程死亡session.timeout.ms 检测整个消费者死亡

4.4、max.poll.records和max.poll.interval.ms如何配置

一定要结合具体业务背景,预估消费能力,合理设置【max.poll.records】和【max.poll.interval.ms】。 默认情况下max.poll.records=500条,max.poll.interval.ms=300秒, 也就是说300s必须消费完500条,否则超时回滚!

将【session.timeout.ms】设置为【heartbeat.interval.ms】的三倍。即连续三次收不到心跳认为消费者挂了。

4.5、poll(long timeout)中timeout和max.poll.interval.ms区别

poll(long timeout)如果消费者从buffer中经历timeout毫秒后拉不到数据,就返回个空消息。 max.poll.interval.ms每次拉取的记录必须在该时间内消费完。

三、消息顺序问题

我们都知道kafka同一个Partition下支持顺序消息。如果kafka单分区消费性能出现瓶颈如何解决?技术解决方案:我觉得这个问题不适合通过基础手段解决,可能我们能想到无数种单分区扩容策略,但是我们保证不了他的健壮性。 业务解决方案:我们可以通过对这类业务划分不同的业务域把大数据变成小数据解耦。 比如:kafka要承接全世界的购物日志,可以把日志按照(街道、区、市、省)划分。

四、topic不停rebalance

五、kafka事务消息

六、做好运维监控

er消息顺序问题               指定分区发送:如果kafka消息需要保证顺序,topic消息发送到kafka指定的分区,因为kafka分区内部,消息是有序的,但是牺牲并发性。 单一生产者和broker场景,max.in.flight.requests.per.connection = 1可以保证顺序不常用。           3. kafka丢失或者重复发送数据                   发送最多一次:禁止Producer消息重试                   发送至少一次:kafka Producer发送消息如果遇到网络抖动,会重试再次发送【kafka默认策略】                   发送精确一次:幂等性Produce、事务型Producer保证精确一次发送,但是效率相对低,支持此策略在kafka0.11.0.0版本后    4. kafka消费丢失或者重复消费数据        自动提交        重复消费:当数据已经被处理,然后自动提交offset时消费者出现故障或者有新消费者加入组导致再均衡,这时候offset提交失败,导致这批已经处理的数据的信息没有记录,后续会重复消费一次        丢失数据:如果业务处理时间较长一点,这时候数据处理业务还未完成,offset信息已经提交了,但是在后续处理数据过程中程序发生了崩溃,导致这批数据未正常消费,这时候offset已经提交,消费者后续将不在消费这批数据,导致这批数据将会丢失        手动提交            重复消费(最少一次消费语义实现):消费数据处理业务完成后进行offset提交,可以保证数据最少一次消费,因为在提交offset的过程中可能出现提交失败的情况,导致数据重复消费    5. kafka消息在topic保存时间            kafka的消息在kafka保存时间由log.retention.xx、retention.ms、segment.ms决定,故消息需要在这几个时间前消费完毕,不然消息会丢失    6. 无法生产消费消息            1. 集群设置问题:advertised.host.name=PLAINTEXT://服务器IP:9092            2.消息过大:【kafka参数】               broker                   message.max.bytes : 这是单个消息的最大值                   replica.fetch.max.bytes :这是单个消息的最大值broker可复制的消息的最大字节数,比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失,但是不能太大,不然复制的时候容易内存溢出              consumer                   fetch.message.max.bytes:这是消费者能读取的最大消息,大于或等于message.max.bytes                  producer                   max.request.size:这是生产者能请求的最大消息,大于或等于message.max.bytes              kafka jvm参数调整:                   堆内存:kafka-server-start.sh文件的 export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"     7. kafka无法及时消费消息               堵塞:kafka消费能力不足或者offset回滚到历史消息,kafka消息可能发生堵塞导致无法及时消费消息     8. 导致kafka rebalance原因及影响               什么是rebalance: rebalance是kafka集群自我保护的一种策略,当组内消费者、生产者、topic分区发生变化              导致rebalance原因:                   heartbeat.interval.ms设置过大                  kafka consumer心跳周期设置过大,导致kafka接收客户端超时导致平凡rbalance,合理设置:    heartbeat.interval.ms必须设置时间为session.timeout.ms的1/3内。 max.poll.interval.ms(poll时间间隔)设置过小或者max.poll.records(poll记录数)设置过大                当kafka consumer在指定的时间无法处理掉poll的消息,Consumer会主动发起 “离开组” 的请求。       9. kafka Controller作用及Controller挂掉会怎么样?【controller知识、controller源码分析】               Controller作用:                     管理副本和分区的状态                     更新集群元数据信息                      创建、删除 topic                      分区重分配                      leader 副本选举                      topic 分区扩展                      broker 加入、退出集群                      受控关闭               Controller 选举               Controller挂掉:                              kafka的每台broker都会watcher,zk的/controller节点并与其建立唯一sessionId,当/controller节点丢失,每台broker会尝试成为/controller,竞争成功成为新的controller。【对zk watcher机制不了解可以查看zk.delete.ZkWatchTest.java】       10. zk为kafka做了什么                维护管理broker、consumer、controller动态加入离开                记录组offset偏移量                负责kafka组内消费者、分区负载平衡                leader 副本选举                topic 分区扩展                 broker 加入、退出集群                 受控关闭 controller leader 选举       11. kafka速度为什么那么快?【kafka高速读写】               Broke异步消息处理:               磁盘顺序写:                    文件存放特征:                           连续存储:文件存储在硬盘中是连续的,好处读写效率高,坏处硬盘利用率低                           非连续存储:文件存储在硬盘通过逻辑连续在一起,好处磁盘利用率高,坏处读写效率低                     kafka连续存储: 由于机械硬盘的特征,旋转磁盘寻址导致顺序读写,效率远远高于随机读写,(顺序随机读写测试)但是顺序写导致kafak无法删除数据,kafka解决办法是通过offset偏移标示读取到第一条数据。 【java实现顺序读写】               page Cache支持:                      java程序写入磁盘流程:                            应用程序--->jvm堆内存--->jvm堆外内存--->系统内核--->系统page cache--->回写硬盘                      page是什么:【page cache是什么、Linux内核漫游记、操作系统与内存管理、pagecache百科、kafka page cache、Linux pagecache存储机制】 page cache是操作系统在物理内存中定义的一片存储空间,当应用在磁盘写入数据的时候,并不是直接写入磁盘,而是写入page Cache,然后page Cache触发回写机制写入磁盘。                      page cache好处:【page cache回写机制测试】                          避免GC:由于kafka是基于java和Scala语言开发的,用page cache避免对象的GC,提高效率。                          数据更安全:如果kafka程序奔溃,由于kafka数据存储在pagecache不会丢失数据。                          零拷贝:同一份数据在内核 Buffer 与用户 Buffer 之间多次copy。               零拷贝:【java实现零拷贝功能】                   零拷贝通过共享内存缓冲区的方式,降低数据用户缓冲区到内核缓冲区的copy。       12. kafka存储结构是什么?【美团kafka存储机制】                kafka日志落盘流程:                      fetcher- index-Segment-partiton-topic【拉取消息-合成索引-组成日志段-N个分区日志段-topic日志段】          kafka在磁盘存储结构:                      segment: segment是记录kafka分区落地数据的文件,其结构如下                          index:index是记录kafka日志位置的索引                          log:log是记录日志文件的实体文件                查看segment日志命令:                          ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /home/admin/kafka-data/__consumer_offsets-32/00000000000675817830.log --print-data-log                                 kafka在zk存储结构:                  admin节点:                   delete_topics : 记录删除的topic               broker节点:                    ids:记录broker节点信息 信息如下:{"jmx_port":9999,"timestamp":"1598337253639","endpoints":["PLAINTEXT://10.213.3.12:9092"],"host":"10.213.3.12","version":3,"port":9092}                   seqid:处理broker.id自动生成场景使用,在seqid基础加一当broker.id              topics集合节点:                        topic:                         partition:                            state: 记录分区信息 例如:{"controller_epoch":382,"leader":1,"version":1,"leader_epoch":223,"isr":[2,1]}               consumer节点:               controller节点:记录集群controller信息。例如:{"version":1,"brokerid":3,"timestamp":"1597901186010"}                  controller_epoch节点:记录controller年代信息                    config节点:记录配置信息                                    13. kafka如何写入消息?               producer-->分区路由-->zk state节点查询leader-->发送消息到leader-->系统page cache-->segment日志-->followers同步{可能作用在page cache或者文件系统}-->followers 发ack信号 14. kafka如何消费消息?              consumer-->消息订阅{topic订阅、topic分区订阅}-->消费起始点-->循环poll{poll-->N*分区fetch-->可能作用在page cache或者文件系统}-->commit{手动提交、自动提交}-->offset偏移修改{consumer_offsets}-->Rebalance{重新选择消费分区} 15.kafka controller选举、分区leader选举?         controller选举:【watch原理】               kafka controller选举是有zk watcher机制实现,其写入节点由zk 节点事务性保证的。         分区leader选举:分区leader选            kafka选择分区算法:【算法一、算法源码解析】 NoOpLeaderSelecto:             啥也不做  OfflinePartitionLeader     触发场景:     * 新创建topic     * PartitionStateMachine启动     * broker启动时     * ReplicaStateMachine检测到broker的znode“被删除”     选举:     1) Isr列表中有存货的replica,直接选出     2) 否则,unclean.leader.election.enable 为false,抛出异常     3) 存活的ar中有replica,选出,否则抛出异常  ReassignedPartitionLeader 触发场景:     * znode节点LeaderAndIsr发生变化     * Broker启动时     * zknode节点/admin/reassign_partitions变动     * 新产生controller时 选举:     * 新设置的ar中,存在broker存活的replica且replica在isr中则选出为leader,否则抛出异常 PreferredReplicaPartitionLeader 触发场景:     * znode节点/admin/preferred_replica_election写入相关数据     * partition-rebalance-thread线程进行触发reblance时     * 新产生controller 选举 :     1) AR中取出一个作为leader,如果与原有leader一样,抛出异常     2) 新leade的replica的broker存活且replica在isr中,选出,否则抛出异常 ControlledShutdownLeader 触发场景:     * kafka的broker进程政策退出发送消息给controller,controller触发 选举:     * 在isr列表中的选出存活的replica,否则抛出异常 16. kafka无法启动报错,报错日志:Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f50f94f6000, 65536, 1) failed; error='Cannot allocate memory' (errno=12)         原因:调大max_map_count参数,max_map_count是连续虚拟内存的个数。

参考文档

1.kafka优秀博文推荐

十、kafka名词解释

介绍kafka常见问题前,先了解kafka常见关键名词意识方便理解。

1、Broker

消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。

2、Topic

一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。

3、Partition

topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。

4、Segment

partition物理上由多个segment组成,下面2.2和2.3有详细说明

5、index

记录Segment日志位置索引文件

6、log

记录Segment日志实体文件

7、offset

每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序   列号叫做offset,用于partition唯一标识一条消息.

8、fetch

kafka消费请求的最小单元

9、poll

一个poll包含多个fetch

10、commit offset

kafka提交偏移量到zk

11、Consumer Group

实现记录kafka offset的消费位置的不同游标

12、rebalance

kafka集群平衡

13、ISR

kafka分区维护与Replica平衡的列表。

                      

                



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3