kafka传递文件 |
您所在的位置:网站首页 › kafka权威指南2 › kafka传递文件 |
可靠的数据传递 可靠性保证 - kafka可以保证分区消息的顺序 - 只有当消息被写入分区的所有同步副本时,才被认为是已提交的 - 只要还有一个副本是活跃的,那么已经提交的消息就不会丢失 - 消费者只能读取已经提交的消息 消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本之间的权衡 复制 kafka复制机制和分区多副本架构是可靠性保证的核心 同步分区要满足的条件 - 与zookeeper间有一个活跃的会话,在过去6s内发送过心跳 - 过去10s内从首领获取过消息 - 过云10s内从首领获取过最新消息,即几乎是零延迟 一个滞后的同步副本会导致生产者和消费者变慢,因为消息被认为已提交前,需要得到同步副本的确诊 非同步副本不会影响性能,但更少的同步副本意味着更低的有效复制系统 broker配置 影响消息存储可靠性的参数,这些参数即可应用于全局,也可应用于特定的主题 1、复制系数 主题级别:replication.factor broker级别:default.replication.factor 即分区副本个数,默认3副本 broker.rack 可以为每个broker配置所在机架的名字,此时kafka会保证分区的副本被分布在多个机架上 2、不完全的首领选举 broker级别:unclean.leader.election 默认true 在选举首领副本过程中没有丢失数据,即提交的数据同时存在于所有同步副本上,好么选举是完全的 如果在首领不可用时,其他副本都不同步,就是不完全 注意:允许不同步的跟随副本提升为首领副本,就要承担丢失数据和数据不一致的风险;若不允许它成为首领,则要接受较低可用性的风险(需要等待原首领恢复到可用状态) 3、最少同步副本 主题级别:min.insync.replicas broker级别:min.insync.replicas 如果同步副本数量小于最小值,broker会停止接受生产者的请求,消费者仍然可正常读取已有数据 (只读) 在可靠的系统里使用生产者 可能存在的问题 - acks设置为1,首领收到消息后,崩溃,消息还没有同步到同步副本时 - acks设置为all,客户端发送后首领崩溃,但客户端没有正确处理"首领不可用"异常,并认为消息已正常投递 发送确认 acks=0 一定会丢失消息,但可得到惊人的吞吐量和带宽利用率 acks=1 风险点,客户端没有正确处理 LeaderNotAvailableException 异常;首领同步消息时崩溃 acks=all 配合min.insync.replicas参数,最保险,但会降低吞吐量 配置生产者的重试参数 broker返回的错误分为两种:可通过重试解决和不能通过重试解决 对于重试次数,需要根据实际情况进行设置 (MirrorMaker的重试策略是无限重试,决不丢失消息) 重试发送已经失败的消息会带来风险,如导致消息重复 重试和恰当的错误处理可保证每个消息"至少被保存一次",但无法保证"只被保存一次" 解决办法是为每条消息加入唯一标识符 和 应用程序做到"幂等" 额外的错误处理 不可重试的错误:消息大小错误、认证错误、序列化错误、重试次数上限或内存达到上限时的错误 在可靠的系统里使用消费者 跟踪哪些消息是已经读取过的,哪些是还没有读取过的 消费者提交了偏移量却未能处理完消息,就有可能造成消息丢失 消费者的可靠性配置 - group.id - auto.offset.reset 在没有偏移量或偏移量无效时,客户端的行为 latest/earliest - enable.auto.commit 是否自动提交偏移量,自动提交无法控制重复处理消息 - auto.commit.interval.ms 自动提交频率 显式提交偏移量 1、总是在处理完事件后再提交偏移量 2、提交频率是性能和重复消息数量之间的权衡 3、确保对提交的偏移量心里有数 是读取到的最新偏移量还是处理过的最新偏移量 4、再均衡 5、消费者可能需要重试 6、消费者可能需要维护状态 7、长时间处理 8、仅一次传递 幂等性写入 验证系统可靠性 1、配置验证 考虑首领选举、控制器选举、依次重启、不完全首领选举情况下的状态 2、应用程序验证 考虑客户端从服务器断开连接、首领选举、依次重启broker、依次重启消费者、依次重启生产者情况下的状态 3、在生产环境监控可靠性 对生产者,error-rate和retry-rate 及错误日志 对消费者,consumer-lag 构建数据管道 可作为各数据段间大型缓冲区,有效解耦管道数据的生产者和消费者 构建数据管道时需要考虑的问题 1、及时性 2、可靠性 kafka本身支持"至少一次传递",再结合具有事务模型或唯一键特性的外部存储系统,kafka也能实现"仅一次传递" 3、高吞吐量和动态吞吐量 kafka和connect api与数据格式无关,生产者和消费者可以使用各种序列化器来表示任意格式的数据 ETL 提取-转换-加载 转换过程由数据管道完成 ELT 提取-加载-转换 转换过程由下游应用程序完成,高保真 Kafka支持加密传输数据,还支持认证和授权,并能提供审计日志 如果数据管道过多地处理数据,会给下游系统造成一些限制和束缚,如果下游系统有新的需求,数据管道就要做相应的变更 更灵活的方式是尽量保留原始数据的完整性,让下游应用自己决定如何处理和聚合数据 Connect API和客户端API 客户端是需要被内嵌到应用程序里的 Connect用于从外部数据存储系统读取数据,或将数据推送到外部系统,是独立于外部系统的一个中间适配器 Kafka Connect 为连接器插件提供一组API和运行时 # 启动Connect bin/connect-distributed.sh config/connect-distributed.properties # 通过API查看已安装的connector插件 curl http://localhost:8083/connector-plugins # 单机模式 bin/connect-standalone.sh +配置参数 文件通过数据管道传输示例: 1、启动一个分布式worker进程 bin/connect-distributed.sh config/connect-distributed.properties & 2、启动一个文件数据源 (把server.properties发送到主题kafka-config-topic,连接器名称load-kafka-config) echo '{"name":"load-kafka-config","config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' | \ curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" 3、消费刚刚发到kafka的消息 bin/kafka-console-consumer.sh --new --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning #==== 接收文件 1、启动一个文件数据池 echo '{"name":"dump-kafka-config","config":{"connector.class":"FileStreamSlink","file":"copy-of-server-properties","topics":"kafka-config-topic"}}' | \ curl -X POST -d @- http://local-host:8083/connectors --header "content-Type:application/json" # 会生成文件copy-of-server-properties内容和源文件相同 2、删除连接器 curl -X DELETE http://localhost:8083/connectors/dump-kafka-config 示例,MySQL -> ElasticSearch page128(109) https://github.com/confluentinc/kafka-connect-elasticsearch https://github.com/confluentinc/kafka-connect-jdbc 深入理解Connect worker进程集群 1、连接器和任务 连接器 决定需要运行多少个任务;按任务拆分数据复制;从worker进程获取任务配置 # 如果通过REST API启动连接器,可能会启动任意节点上的连接器,连接器的任务会在该节点上执行 2、任务 任务负责将数据移入或移出kafka 3、worker进程 是连接器和任务的容器 4、转化器和Connect的数据模型 用户配置worker进程(或连接器)时,可选择使用合适的转化器,用于将数据保存到kafka 可用的转化器有:avro、json、string 5、偏移量管理 源连接器返回给worker进程的记录里包含一个逻辑分区和一个逻辑偏移量 (非kafka的分区和偏移量) 其他选择 1、基于图形界面的ETL工具 kafka可以是一个支持数据集成(使用Connect)、应用集成(使用生产者和消费者)和流式处理平台 kafka可以成为ETL工具的替代品(ETL工具太过复杂) 2、流式处理框架 数据集成系统应该只做一件事:传统数据;可靠性是数据集成系统唯一一个重要需求 跨集群数据镜像 集群间的数据复制叫做镜像,kafka内置的跨集群复制工具是 MirrorMaker 跨集群镜像的使用场景 - 区域集群和中心集群 - 冗余 DR - 云迁移 跨数据中心通信的现实情况 - 高延迟 - 有限的带宽 - 高成本 因为kafka服务器和客户端是按照单个数据中心进行设计、开发、测试和调优的,默认的超时时间和缓冲区大小不适用于跨多个数据中心安装kafka服务器 向远程数据中心生成数据要忍受高延迟,且需增加重试次数和增大缓冲区以解决潜在的网络问题 Hub和Spoke架构 该架构适用于一个中心kafka集群对应多个本地kafka集群的情况 数据只会在本地数据中心生成,且每个数据中心的数据只会被镜像到中央数据中心一次 不足:一个数据中心的应用程序无法访问另一个数据中心的数据 双活架构 Active-Active 两个或多个数据中心需共享数据且每个数据中心都可以生产和读取数据 优势:可就近为用户提供服务,具有性能优势,不会因数据可用性问题在功能方面作出牺牲;冗余和弹性 不足:如何进行多个位置数据异步读取和异步更新时避免冲突;数据一致性问题;循环复制 主备架构 Active-Standby 优势:易于实现,可用于任何场景 不足:浪费了一个集群 失效备援 要实现不丢失数据或无重复数据的kafka集群失效备援是不可能的 - 数据丢失和不一致性 镜像是异步的,会落后源一定时的消息 - 失效备援之后的起始偏移量 -- 偏移量自动重置,直接从头或末尾开始消费消息,存在消息重复消费和丢失的问题 -- 复制偏移量主题,__consumer_offsets 需要注意的是,两边的实际偏移量可能并不一致 (需要从0开始镜像数据并持续镜像偏移量主题) -- 基于时间的失效备援 (新版kafka支持) -- 偏移量外部映射 非常复杂 kafka客户端只需要连接一个broker,就可以获取到整个集群的元数据,并发现集群里的其他broker,一般提供3个broker信息就可以了 延展集群 stretch cluster 跨多个数据中心安装单个kafka集群 优势:同步复制;此外,所有broker都发挥了作用 不足:应对灾难类型很有限;运维复杂 需要3个数据中心,因为zookeeper要求大多数节点用用时,整个集群才可用 MirrorMaker MirrorMaker为每个消费者分配一个线程,消费者从源集群的主题和分区上读取数据,然后通过公共生产者将数据发送到目标集群 MirrorMaker进程若发生崩溃,默认最多出现60秒的得复数据 能尽可能做到仅一次传递 MirrorMaker示例 bin/kafka-mirror-maker --consumer.config etc/kafka/consumer.properties --producer.config etc/kafka/producer.properties --new.consumer --num.streams=2 --whitelist ".*" 配置说明 consumer.config # 所有消费者共用该配置,所有消费者属于同一个消费者群组 # auto.commit.enable=false 通常不需修改,若修改该参数,可能导致数据丢失 # auto.offset.reset一般需要修改,默认是latest(仅对MirrorMaker启动之后到达集群的数据进行镜像),根据情况可修改为earliest producer.config 生产者的配置文件,唯一必选的参数是 bootstrap.servers new.consumer 选择consumer的版本,建议使用更新版本的消费者,因为更稳定 num.streams 消费者数量 whitelist 正则表达式,代表需要进行镜像的主题名,所有与表达式匹配的主题都可以被镜像 在生产环境部署MirrorMaker 可以在docker容器里运行MirrorMaker MirrorMaker是完全无状态的,也不需要磁盘存储(所有数据和状态都保存在kafka上) 单个实例的吞吐量受限于生产者,因为一个实例只有一个生产者,而要提高整体吞吐量,可部署多个实例 如果可能,尽量将MirrorMaker运行在目标数据中心 若网络出现问题,一个无法连接到集群的消费者比一个无法连接到集群的生产者要安全得多 如果跨数据中心流量需要加密,最好把MirrorMaker放在源数据中心 将MirrorMaker部署到生产环境时,需要对以下几项内容进行监控 - 延迟监控 -- 通过MirrorMaker提交到源集群的偏移量,默认有一分钟的延迟,因为1分钟提交一次,然后通过 kafka-consumer-groups工具获取该偏移值 -- 通过消费者JMX发布的最大延迟,只反映消费者读取的数据,没有考虑生产者是否成功将数据发送到目标集群 ## 如果MirrorMaker跳过或丢弃部分消息,上面两种方法都无法检测到 - 度量指标监控 -- 消费者:fetch-size-avg,fetch-size-max,fetch-rate,fetch-throttle-time-avg,fetch-throttle-time-max -- 生产者:batch-size-avg,batch-size-max,requests-in-flight,record-retry-rate -- 同时适用两者:io-ratio,io-wait-ratio - canary -- 每分钟向源集群特定主题发送一个事件,然后尝试从目标集群读取这个事件,在超过指定时间还没有读取到就触发报警 MirrorMaker调优 MirrorMaker集群的大小,取决于对吞吐量的需求和对延迟的接受程度 压测: 使用kafka-performance-producer工具,在源集群上制造负载,然后启动MirrorMaker对这个负载进行镜像,分别设置不同的消费者数,观察性能在哪个点开始下降 然后将 num.streams 的值设置为一个小于当前点的整数,竞而得到单个实例的最大吞吐量 Linux系统调优 如果MirrorMaker是跨数据中心运行的,可对linux网络进行优化 - 增加TCP缓冲区大小 net.core.rmem_default,net.core.rmem_max,net.core.wmem_default,net.core.wmem_max,net.core.optmem_max - 启用时间窗口自动伸缩 sysctl -w net.ipv4.tcp_window_scaling=1 或者添加到 /etc/sysctl.conf - 减少TCP慢启动时间 将/proc/sys/net/ipv4/tcp_slow_start_after_idle设为0 可参考:Sandra K.Johnson等人合著的Performance tuning for Linux servers 对生产者调优 - max.in.flight.requests.per.connection 默认只有一个处理中的请求,即生产者在发送下一个消息前,当前发送的消息必须得到目标集群确认 这是唯一能保证消息次序的方法,若不在乎消息次序,可增加该值以提高吞吐量 - linger.ms batch.size 若总是发送未填满的批次(batch-size-avg和batch-size-max的值总是比batch.size低),可增加latency.ms以尽量满批次 若都是满批次,可加大batch.size以加大批次大小 提升消费者吞吐量 - range 分区分配策略,但会导致分配不均,可以考虑改为round robin partition.assignment.strategy=org.apache.kafka.clients.consumer.RoudRobinAssignor - fetch.max.bytes 若指标fetch-size-avg和fetch-size-max的数值与fetch.max.bytes很接近,说明消费者读取的数据已经接近上限 此时可以增大fetch.max.bytes在每个请求里读取更多的数据 - fetch.min.bytes fetch.max.wait 若指标fetch-rate值很高,说明消费者发送的请求太多,且获取不到足够的数据 可调大这两个参数,这样消费者的每个请求可以获取到更多的数据 其他跨集群镜像方案 1、优步的uReplicator 随着主题和分区的增加及集群吞吐量的增长,MirrorMaker面临的问题 - 再均衡延迟 新加分区或消费者离线时 - 难以增加新主题 使用了正则匹配,当有新主题加入时,会发生再均衡,这往往是不可预期的 使用Apache Helix管理主题列表并分配给每个实例分区,避免了再均衡的问题 2、Confluent的Replicator Replicator解决的痛点 - 分散的集群配置 保证不同集群中配置的一致性 - 在集群管理方面面临的挑战 MirrorMaker本身的部署、监控和配置管理 Replicator不仅从kafka主题复制数据,还会从zk上复制主题的配置信息 喜欢 (1)or分享 (0) |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |