Kafka及Kafka消费者的消费问题及线程问题

您所在的位置:网站首页 kafka不同组消费同一个topic Kafka及Kafka消费者的消费问题及线程问题

Kafka及Kafka消费者的消费问题及线程问题

2024-06-09 03:15| 来源: 网络整理| 查看: 265

Kafka 一、Kafka中的基本信息kafka中topic、broker、partition、及customer和producer等的对应关系Window安装配置kafka和zookeeper并将其加入服务的方式 二、 Kafka消费者的消费问题及线程问题kafka中不同topic使用同一个Group Id会出现的问题分析高效的消费一个Topickafka避免重复消费的思路kafka中Offset不更新的情况和处理思路kafka如何避免重复消费,offset不更新的情况 三、消费时的数据库与线程池springboot中所有涉及到数据库操作的程序使用同一个线程池,如何避免数据库Too Many Connections错误因为线程问题,数据库遇“1040:Too Many Connections” 错误解决方案 kafka基础学习一、Wget的用法一、消息队列的两种模式二、kafka基础架构(broker,partition和topic)三、zookeeper与Kafka的关系四、注意事项五、工具六、具体指令七、思考八、生产者的组成九、生产者DQuene的sender十、分区选择十一、提升生产者的吞吐量十二、生产者的消息发送是否会丢失,即生产者的数据可靠性,也可能出现数据重复十四、幂等性十五、kafka生产者的事务十六、数据库事务和kafka事务的区别十七、数据乱序十八、单分区有序的设置十九、Kafka中AR、OSR、ISR的区别二十、Kafka中的Follow故障

一、Kafka中的基本信息 kafka中topic、broker、partition、及customer和producer等的对应关系

Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。

Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker 中,从而增加了消息的并发性、可扩展性和吞吐量。

Broker:是 Kafka 集群中的一个或多个服务器实例,每个 Broker 可以存在于一个或多个 Topic 的发布或订阅者列表中,并且可以跨集群多个 Brokers 进行消息路由和数据副本工作。

Producer:是指向 Kafka Topic 发送消息的客户端应用程序。Producer 通过向 Broker 发送消息来向 Kafka Topic 发布数据。

Consumer:是从 Kafka Topic 订阅消息的客户端应用程序。Consumer 通过消费者群组的方式订阅一个 Topic,并从 Broker 中读取消息进行消费。

Consumer Group:是指一个或多个 Consumer 实例的集合,共同订阅同一个 Topic 并共同消费其中的消息。Consumer Group 可以增加消费的吞吐量,并且可以实现负载均衡和故障自动转移。

Offset:是指在 Partition 中每个消息的唯一标识符。每条消息都有一个 offset,Kafka 使用这个 offset 来保证顺序和消息传递的可靠性。

Replication:是指在 Kafka 副本模式下所有数据被复制到多个 Broker,以提高数据的冗余性和可用性。

Broker 集群:是指由多个 Broker 组成的 Kafka 集群,用于提供高可用性和可扩展性的数据流处理和分发服务。Kafka 集群中的每个 Broker 都是具有一定功能的独立工作单元。

Window安装配置kafka和zookeeper并将其加入服务的方式 在kafka目录下的bin/windows文件下,在当前位置打开cmd,使用命令行启动zookeeper和kafka。 先启动zookeeper,在启动kafka

.\zookeeper-start.bat .\config\zookeeper.properties .\kafka-server-start.bat .\config\server.properties

使用nssm安装到服务后启动 进入nssm官网下载nssm.exe,地址见elk安装。 注意事项:配置文件properties,写在parameter处。 二、 Kafka消费者的消费问题及线程问题 kafka中不同topic使用同一个Group Id会出现的问题分析

在 Kafka 中,消费者组(consumer group)是一组逻辑上相同的消费者,它们共同消费订阅的 topic 中的消息。如果不同 topic 使用同一个 Group ID,那么它们将视为*同一组消费者,相当于每个消费者实例只能消费组中的一个主题,而其他主题的分区将不会被消费。同时,如果消息被放置在某个主题的分区中,在该主题下任何消费者组中的消费者都有可能消费到该消息,这可能导致数据重复消费或者数据消费不均,会降低整个消费群组的消费效率。因此,在实际应用中,应该为每个 topic 创建一个独立的 Group ID,以确保消费者能够正确地消费对应 topic 下的所有消息。

高效的消费一个Topic 批量消费:通过调整消费拉取的大小,可以将单次拉取到的消息量增加到一个较大的数量,从而减少通信开销和网络带宽使用,提高吞吐量。并行化消费:在拥有多个消费者实例的同一消费者组中,可以将每个消费者实例分配到不同的分区上,以实现并行消费。分区分配可以使用 Kafka 的自动分区分配功能。增量消费:消费者可以记录消费的 offset,下次消费从上一次消费到的 offset 开始,避免重复消费。合理的配置和优化:优化 Kafka 集群的配置,如调整未消费消息的保留时间,通过优化消费者端的参数设置,让消费者能够及时地拉取消息并处理。使用高级 API:Kafka 提供了 Consumer API 和 Streams API,可以适应不同的消费场景。 Consumer API 提供了一种较低级别的消费方式,这种方式需要消费者自己管理 offset、分区、并发等。 Streams API 则提供了一种更高级别的消费方式,这种方式封装了很多细节,比如处理流式数据的能力、失败恢复以及自我管理等。使用 Streams API 可以显著地加快消费速度。

通过对批量消费、并行化消费、增量消费、优化配置和使用高级 API 等方面的优化,可以高效地消费 Kafka 中的一个 topic。

kafka避免重复消费的思路

在 Kafka 中,为了避免重复消费消息,每个消费者会维护一个 Offset 的值,用来记录自己已经消费了哪些消息。Kafka 提供了三种 Offset 维护的方式,分别是:

手动维护 Offset:在这种方式下,消费者需要手动记录 offset 的值,并在消费结束后保存这个 offset,用于下一次消费的起点。自动提交 Offset:在这种方式下,Kafka 会自动定时提交 offset 的值,但由于自动提交是异步的,因此如果在消息消费期间消费者异常退出或者重启,就会导致 offset 丢失或重复提交。手动+自动维护 Offset:在这种方式下,除了内部维护的消费 offset 之外,还需要手动维护一个“checkpoint”,当消息消费成功后,将消费的 offset 保存在“checkpoint”中,以便于在消费者重启或异常退出后重新消费消息。 kafka中Offset不更新的情况和处理思路 消费者处理消息的时间太长,导致在下一次消费开始时,Offset 还没有被更新。消费者的重启或者异常退出导致 Offset 没有提交或者提交不完整。 kafka如何避免重复消费,offset不更新的情况 缩短消息处理的时间,以便在下一次消费开始时,Offset 已经更新。在消费者退出时,将 Offset 手动提交。使用事务来实现消息的消费和 Offset 提交,并始终在一起提交,这样即使在消费者异常退出时,就可以通过事务回滚来避免重复消费。 三、消费时的数据库与线程池 springboot中所有涉及到数据库操作的程序使用同一个线程池,如何避免数据库Too Many Connections错误

使用同一线程池会带来一个问题,就是当线程池内的线程数量过多时,可能会导致数据库因为连接数过多而出现 Too Many Connections 错误。为了避免这个问题,可以采用以下措施:

将线程池的最大连接数设置为合理的值。通过合理设置最大连接数,保证线程池内线程数量的合理性,并且避免连接过多导致数据库出现问题。使用数据库连接池来管理数据库连接。比如,Spring Boot 中默认使用的连接池是 HikariCP 连接池,它能够自动的管理连接的数量,预防连接池出现 Too Many Connections 错误。确保每个程序正确关闭数据库连接。无论是使用连接池还是手动创建连接,都必须正确的关闭连接,以便释放资源和避免连接池出现错误。 因为线程问题,数据库遇“1040:Too Many Connections” 错误解决方案 增加数据库连接数限制:可以在 MySQL 数据库中设置最大连接数限制,修改 My.cnf 文件,在 [mysqld] 下面加上如下参数:max_connections = 500检查访问数据库的代码:检查在程序中是否有未关闭连接的情况,如果有,需要手动关闭连接来释放资源。减少线程池数量:尝试减少线程池中线程数的数量,可以通过将线程池的 coreSize 或者 maximumPoolSize 参数设置为较小的值,以限制线程的数量。使用连接池:可以使用连接池来实现对数据库连接的管理,比如使用 HikariCP、Druid、C3P 等开源的连接池工具,可以自动地管理连接的数量,避免过多的连接数。使用分布式数据库:如果仍然存在高并发场景需要访问数据库,并且连接数过多的情况,可以考虑使用分布式数据库,将数据库中的数据进行分片存储,每个分片都使用单独的数据库实例,从而降低单个数据库的连接数量,提高数据库的可扩展性和性能。

通过检查访问数据库的代码、增加数据库连接数限制、使用连接池等多种方式来优化和改进数据库连接的管理。

kafka基础学习 一、Wget的用法 wget -O "rename" "url" //下载文件并重命名 wget "url" //直接下载文件 wget -c "url" //继续下载因为网络中断停止下载的文件 wget -b "url" //后台下载文件 wget –spider "url" //测试该网址的下载速度 一、消息队列的两种模式

1.点对点模式 消费者主动拉取数据,消息收到后消除。 2.发布/订阅模式 可以有多个topic主题(浏览、点赞、收藏、评论等) 消费者消费数据之后,不删除数据 每个消费者相互独立,都可以消费到数据 有定期清理规则,与消费者无关

二、kafka基础架构(broker,partition和topic)

1.为方便扩展,并提高吞吐量,一个topic分为多个partition 2.配合分区的设计,提出消费者组的概念,组内每个消费者并行消费 3.为提高可用性,为每个partition增加若干副本,类似NameNode HA 4.ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK

三、zookeeper与Kafka的关系 必须在kafka全部关闭后才能关闭zookeeper,否则只能kill -9 四、注意事项

1.消费者只会消费建立连接后的数据 2.分区数只能增加,不能减少 3.不能通过命令行修改分区副本

五、工具

1.服务器配置文件分发工具 xsync 2.java进程查看工具 jps

六、具体指令

1.查看所有topic列表

kafka-topics.bat --bootstrap-server localhost:9092 --list

2.查看topic具体信息

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --describe

3.生产者发送数据

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

4.消费者接收数据

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

5.消费者接收历史数据

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-brginning 七、思考

为什么kafka不使用java自带的serializer

八、生产者的组成

main线程 拦截器 序列化器 分区器-双端队列,内存池(内存的创建和销毁)

九、生产者DQuene的sender

数据积累,当数据量达到batch.size,才发送数据,batch.size默认大小是16K 时间积累,当数据量未达到batch。size的大小,sender等待linger.ms设置的大小,单位为ms,默认0ms,表示无延迟

十、分区选择

1.自己指定分区,填入指定的分区数 2.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值; 3.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition (黏性分区器),会随机选择一个分区,并尽可一直使用该分区,待该分区的batch已满或者已完成(即),Kafka再随机一个分区进行使用(和上一次的分区不同)。

十一、提升生产者的吞吐量

1.增大缓冲区 2.增加linger.ms 3.数据压缩snappy,zip等

十二、生产者的消息发送是否会丢失,即生产者的数据可靠性,也可能出现数据重复

通过对ACK应答级别进行设置,保证数据的可靠性 ACK的应答级别:-1,0,1 -1:生产者发送的数据,不需要等数据落盘应答 0:生产者发送的消息,Leader收到数据后应答 1:生产者发送过来的数据,leader和ISR队列里面所有节点收齐数据后应答 数据的可靠性保证: ACK级别为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 应答级别的可靠性排序 -1>1>0

数据重复出现的情况 在同步完成,但是未应答时leader挂了,重复选举后又对数据进行同步 最少发送一次即ACK级别为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2 最多一次即ACK级别为0

十四、幂等性 但是,要保证数据只发送一次,不多不少,即幂等性 Kafka生产者的幂等性是指,当生产者尝试重新发送之前已经发送过的消息时,Kafka可以保证这些消息只会被写入一次,而不会被写入多次。这种机制的实现可以避免以下场景: 网络中断,导致消息重发。 应答超时或者失败,导致消息重发。 重复发送同一消息,导致数据重复。 Kafka实现幂等性的方式是通过给每一条消息分配一个固定的序列号,这个序列号会被放入消息的header中进行传输。 当该消息被服务端接收并成功写入到Kafka分区中时,该序列号会被标记为已处理,以后再次发送相同序列号的消息时,生产者会在header中携带该消息曾经分配到的序列号,服务端就会根据序列号判断该消息是否已经被处理过,如果该序列号已经被标记为已经处理,则服务端会忽略该消息,从而避免数据重复写入。 生产者幂等性的实现对于数据一致性极为重要的场景非常有用,它可以避免生产者重复发送数据等问题,从而提高了系统的稳定性和可用性。 十五、kafka生产者的事务 Kafka事务是指多个Kafka消息的原子性批量提交,即所有消息要么全部提交成功,要么全部回滚。Kafka 0.11版本开始提供事务及ACID语义支持,Kafka 0.11版本之前没有提供对事务的正式支持。 Kafka的事务机制主要基于消息生产者API,通过引入两个新API:beginTransaction和commitTransaction来实现事务支持。startTransaction 用于在生产者端启动事务,beginTransaction则代表由应用程序开始了一批消息的处理。在所有消息处理完毕,确认消息发送成功之后,就可以调用 commitTransaction 方法完成事务。在调用 comitTransaction 方法时, Kafka会将所有成功发送的消息一起提交,如果有任何一个消息发送失败,则整个事务都会失败。 在事务提交之前,消息生产者将所有消息缓存在一个暂存池中,这个暂存池就是所谓的批次(batch),只有当消息发送成功并被确认后,批次中的消息才被视为全部发送成功。事务的过程中,消息的发送被视为不可见。只有当事务提交成功后才会对其他消费者可见。 Kafka 事务处理非常适合需要确保消息传输的可靠性,比如在分布式事务的场景下,可以保证分布式业务操作的可靠性。Kafka 事务的机制可以让我们以数据库事务的方式保证消息的可靠传输。 十六、数据库事务和kafka事务的区别 1.范围不同:Kafka的事务处理的对象是消息数据,而数据库的事务处理的对象是数据表中的数据。 2.原子性实现方式不同:Kafka的事务实现的原子性是通过消息的批量发送机制实现的,而数据库的事务是通过原子性提交或回滚操作来实现的。 3.并发性实现方式不同:Kafka的事务是基于分布式协调方式实现的,而数据库的事务是基于锁定机制实现的。 4.目的不同:Kafka的事务主要用于保证消息传输的可靠性,而数据库的事务主要用于保证数据操作的完整性和一致性。 5.调用方式不同:Kafka的事务是由生产者组织事务,通过生产者API的方式实现,而数据库的事务是由应用程序组织事务,通过数据库事务操作语句进行实现。 Kafka的事务和数据库的事务有很多相似之处,都是用来保证数据操作或传输的可靠性和一致性。但是,由于它们的应用场景和实现方式的不同,它们在具体实现上也有很大的差异。 十七、数据乱序 在单个分区中,数据是有序的。但是多个分区中数据的传输是无序的,消费者自行排序 十八、单分区有序的设置

在kafka 1.x前保证数据单分区有序,无需考虑幂等性 max.in.flight.requests.per.connection = 1 在kafka 1.x后保证数单分区有序的设置得考虑幂等性,最近的几个元数据都是有序的。 幂等性未开启:max.in.flight.requests.per.connection = 1 幂等性开启:max.in.flight.requests.per.connection = 5,必须设置小于等于5.

十九、Kafka中AR、OSR、ISR的区别

在Kafka中,AR、OSR和ISR都是与副本相关的术语。其中,AR是Assigned Replicas的缩写,指的是每个partition下所有副本(replicas)的统称;ISR是In-Sync Replicas的缩写,是指副本同步队列,ISR是AR中的一个子集;OSR是Out-of-Sync Replicas的缩写,是指与leader副本保持一定程度同步但未加入同步队列的副本,不包括leader副本。

AR=ISR+OSR。正常情况下,所有的follower副本都应该与leader副本保持同步,即AR=ISR,OSR集合为空。

二十、Kafka中的Follow故障

LEO是Last End Offset的缩写,指的是当前replica存的当前最大offset的下一个值;HW是High Watermark的缩写,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

在Kafka中,HW是分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW。所以,HW、LW是分区层面的概念;而LEO、LogStartOffset是日志层面的概念;LSO是事务层面的概念。



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3