kafka

您所在的位置:网站首页 kafka集群配置详解 kafka

kafka

2023-02-22 09:17| 来源: 网络整理| 查看: 265

kafka官方文档 参考Kafka三款监控工具比较

1 查看kafka的版本

进入kafka所在目录,通过查看libs目录下的jar包。 2.11是scala的版本,2.0.0是kafka的版本。

测试环境 #systemctl start zookeeper #systemctl start kafkka

2 kafka的常用配置

Kafka使用属性文件格式的键值对进行配置。这些值可以从文件或以编程方式提供。

2.1 Broker配置 ************************* *****Importance: high advertised.listeners null auto.create.topics.enable true auto.leader.rebalance.enable true background.threads 10 broker.id -1 compression.type producer control.plane.listener.name null controller.listener.names null controller.quorum.election.backoff.max.ms 1000 controller.quorum.election.timeout.ms 1000 controller.quorum.fetch.timeout.ms 2000 controller.quorum.voters "" delete.topic.enable true early.start.listeners null leader.imbalance.check.interval.seconds 300 leader.imbalance.per.broker.percentage 10 listeners PLAINTEXT://:9092 log.dir /tmp/kafka-logs log.dirs null log.flush.interval.messages 9223372036854775807 log.flush.interval.ms null log.flush.offset.checkpoint.interval.ms 60000 log.flush.scheduler.interval.ms 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms 60000 log.retention.bytes -1 log.retention.hours 168 log.retention.minutes null log.retention.ms null log.roll.hours 168 log.roll.jitter.hours 0 log.roll.jitter.ms null log.roll.ms null log.segment.bytes 1073741824 (1 gibibyte) log.segment.delete.delay.ms 60000 (1 minute) message.max.bytes 1048588 metadata.log.dir null metadata.log.max.record.bytes.between.snapshots 20971520 metadata.log.segment.bytes 1073741824 (1 gibibyte) metadata.log.segment.ms 604800000 (7 days) metadata.max.retention.bytes -1 metadata.max.retention.ms 604800000 (7 days) min.insync.replicas 1 node.id -1 num.io.threads 8 num.network.threads 3 num.recovery.threads.per.data.dir 1 num.replica.alter.log.dirs.threads null num.replica.fetchers 1 offsetadata.max.bytes 4096 (4 kibibytes) offsets.commit.required.acks -1 offsets.commit.timeout.ms 5000 (5 seconds) offsets.load.buffer.size 5242880 offsets.retention.check.interval.ms 600000 (10 minutes) offsets.retention.minutes 10080 offsets.topic.compression.codec 0 offsets.topic.num.partitions 50 offsets.topic.replication.factor 3 offsets.topic.segment.bytes 104857600 (100 mebibytes) process.roles "" queued.max.requests 500 replica.fetch.min.bytes 1 replica.fetch.wait.max.ms 500 replica.high.watermark.checkpoint.interval.ms 5000 (5 seconds) replica.lag.time.max.ms 30000 (30 seconds) replica.socket.receive.buffer.bytes 65536 (64 kibibytes) replica.socket.timeout.ms 30000 (30 seconds) request.timeout.ms 30000 (30 seconds) sasl.mechanism.controller.protocol GSSAPI socket.receive.buffer.bytes 102400 (100 kibibytes) socket.request.max.bytes 104857600 (100 mebibytes) socket.send.buffer.bytes 102400 (100 kibibytes) transaction.max.timeout.ms 900000 (15 minutes) transaction.state.log.load.buffer.size 5242880 transaction.state.log.min.isr 2 transaction.state.log.num.partitions 50 transaction.state.log.replication.factor 3 transaction.state.log.segment.bytes 104857600 (100 mebibytes) transactional.id.expiration.ms 604800000 (7 days) unclean.leader.election.enable false zookeeper.connect null zookeeper.connection.timeout.ms null zookeeper.max.in.flight.requests 10 zookeeper.session.timeout.ms 18000 (18 seconds) zookeeper.set.acl false ************************* *****Importance: medium broker.heartbeat.interval.ms 2000 (2 seconds) broker.id.generation.enable true broker.rack null broker.session.timeout.ms 9000 (9 seconds) connections.max.idle.ms 600000 (10 minutes) connections.max.reauth.ms 0 controlled.shutdown.enable true controlled.shutdown.max.retries 3 controlled.shutdown.retry.backoff.ms 5000 (5 seconds) controller.quorum.append.linger.ms 25 controller.quorum.request.timeout.ms 2000 (2 seconds) controller.socket.timeout.ms 30000 (30 seconds) default.replication.factor 1 delegation.token.expiry.time.ms 86400000 (1 day) delegation.token.master.key null delegation.token.max.lifetime.ms 604800000 (7 days) delegation.token.secret.key null delete.records.purgatory.purge.interval.requests 1 fetch.max.bytes 57671680 (55 mebibytes) fetch.purgatory.purge.interval.requests 1000 group.initial.rebalance.delay.ms 3000 (3 seconds) group.max.session.timeout.ms 1800000 (30 minutes) group.max.size 2147483647 group.min.session.timeout.ms 6000 (6 seconds) initial.broker.registration.timeout.ms 60000 (1 minute) inter.broker.listener.name null inter.broker.protocol.version log.cleaner.backoff.ms 15000 (15 seconds) log.cleaner.dedupe.buffer.size 134217728 log.cleaner.delete.retention.ms 86400000 (1 day) log.cleaner.enable true log.cleaner.io.buffer.load.factor 0.9 log.cleaner.io.buffer.size 524288 log.cleaner.io.max.bytes.per.second 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms 9223372036854775807 log.cleaner.min.cleanable.ratio 0.5 log.cleaner.min.compaction.lag.ms 0 log.cleaner.threads 1 log.cleanup.policy delete log.index.interval.bytes 4096 (4 kibibytes) log.index.size.max.bytes 10485760 (10 mebibytes) log.message.format.version log.message.timestamp.difference.max.ms 9223372036854775807 log.message.timestamp.type CreateTime log.preallocate false log.retention.check.interval.ms 300000 (5 minutes) max.connection.creation.rate 2147483647 max.connections 2147483647 max.connections.per.ip 2147483647 max.connections.per.ip.overrides "" max.incremental.fetch.session.cache.slots 1000 num.partitions 1 password.encoder.old.secret null password.encoder.secret null principal.builder.class producer.purgatory.purge.interval.requests 1000 queued.max.request.bytes -1 replica.fetch.backoff.ms 1000 (1 second) replica.fetch.max.bytes 1048576 (1 mebibyte) replica.fetch.response.max.bytes 10485760 (10 mebibytes) replica.selector.class null reserved.broker.max.id 1000 sasl.client.callback.handler.class null sasl.enabled.mechanisms GSSAPI sasl.jaas.config null sasl.kerberos.kinit.cmd /usr/bin/kinit sasl.kerberos.min.time.before.relogin 60000 sasl.kerberos.principal.to.local.rules DEFAULT sasl.kerberos.service.name null sasl.kerberos.ticket.renew.jitter 0.05 sasl.kerberos.ticket.renew.window.factor 0.8 sasl.login.callback.handler.class null sasl.login.class null sasl.login.refresh.buffer.seconds 300 sasl.login.refresh.min.period.seconds 60 sasl.login.refresh.window.factor 0.8 sasl.login.refresh.window.jitter 0.05 sasl.mechanism.inter.broker.protocol GSSAPI sasl.oauthbearer.jwks.endpoint.url null sasl.oauthbearer.token.endpoint.url null sasl.server.callback.handler.class null sasl.server.max.receive.size 524288 security.inter.broker.protocol PLAINTEXT socket.connection.setup.timeout.max.ms 30000 (30 seconds) socket.connection.setup.timeout.ms 10000 (10 seconds) socket.listen.backlog.size 50 ssl.cipher.suites "" ssl.client.auth none ssl.enabled.protocols TLSv1.2,TLSv1.3 ssl.key.password null ssl.keymanager.algorithm SunX509 ssl.keystore.certificate.chain null ssl.keystore.key null ssl.keystore.location null ssl.keystore.password null ssl.keystore.type JKS ssl.protocol TLSv1.3 ssl.provider null ssl.trustmanager.algorithm PKIX ssl.truststore.certificates null ssl.truststore.location null ssl.truststore.password null ssl.truststore.type JKS zookeeper.clientCnxnSocket null zookeeper.ssl.client.enable false zookeeper.ssl.keystore.location null zookeeper.ssl.keystore.password null zookeeper.ssl.keystore.type null zookeeper.ssl.truststore.location null zookeeper.ssl.truststore.password null zookeeper.ssl.truststore.type null ************************* *****Importance: low alter.config.policy.class.name null alter.log.dirs.replication.quota.window.num 11 alter.log.dirs.replication.quota.window.size.seconds 1 authorizer.class.name "" client.quota.callback.class null connection.failed.authentication.delay.ms 100 controller.quorum.retry.backoff.ms 20 controller.quota.window.num 11 controller.quota.window.size.seconds 1 create.topic.policy.class.name null delegation.token.expiry.check.interval.ms 3600000 (1 hour) kafka.metrics.polling.interval.secs 10 kafka.metrics.reporters "" listener.security.protocol.map PLAINTEXT:PLAINTEXT log.message.downconversion.enable true metadata.max.idle.interval.ms 500 metric.reporters "" metrics.num.samples 2 metrics.recording.level INFO metrics.sample.window.ms 30000 (30 seconds) password.encoder.cipher.algorithm AES/CBC/PKCS5Padding password.encoder.iterations 4096 password.encoder.key.length 128 password.encoder.keyfactory.algorithm null quota.window.num 11 quota.window.size.seconds 1 replication.quota.window.num 11 replication.quota.window.size.seconds 1 sasl.login.connect.timeout.ms null sasl.login.read.timeout.ms null sasl.login.retry.backoff.max.ms 10000 (10 seconds) sasl.login.retry.backoff.ms 100 sasl.oauthbearer.clock.skew.seconds 30 sasl.oauthbearer.expected.audience null sasl.oauthbearer.expected.issuer null sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour) sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds) sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100 sasl.oauthbearer.scope.claim.name scope sasl.oauthbearer.sub.claim.name sub security.providers null ssl.endpoint.identification.algorithm https ssl.engine.factory.class null ssl.principal.mapping.rules DEFAULT ssl.secure.random.implementation null transaction.abort.timed.out.transaction.cleanup.interval.ms 10000 (10 seconds) transaction.remove.expired.transaction.cleanup.interval.ms 3600000 (1 hour) zookeeper.ssl.cipher.suites null zookeeper.ssl.crl.enable false zookeeper.ssl.enabled.protocols null zookeeper.ssl.endpoint.identification.algorithm HTTPS zookeeper.ssl.ocsp.enable false zookeeper.ssl.protocol TLSv1.2 2.2 Topic配置 *****Importance: medium cleanup.policy delete compression.type producer delete.retention.ms 86400000 (1 day) file.delete.delay.ms 60000 (1 minute) flush.messages 9223372036854775807 flush.ms 9223372036854775807 follower.replication.throttled.replicas "" index.interval.bytes 4096 (4 kibibytes) leader.replication.throttled.replicas "" max.compaction.lag.ms 9223372036854775807 max.message.bytes 1048588 message.format.version 3.0-IV1 message.timestamp.difference.max.ms 9223372036854775807 message.timestamp.type CreateTime min.cleanable.dirty.ratio 0.5 min.compaction.lag.ms 0 min.insync.replicas 1 preallocate false retention.bytes -1 retention.ms 604800000 (7 days) segment.bytes 1073741824 (1 gibibyte) segment.index.bytes 10485760 (10 mebibytes) segment.jitter.ms 0 segment.ms 604800000 (7 days) unclean.leader.election.enable false *****Importance: low message.downconversion.enable true 2.3 Producer配置 ************************* *****Importance: high key.serializer value.serializer bootstrap.servers "" buffer.memory 33554432 compression.type none retries 2147483647 ssl.key.password null ssl.keystore.certificate.chain null ssl.keystore.key null ssl.keystore.location null ssl.keystore.password null ssl.truststore.certificates null ssl.truststore.location null ssl.truststore.password null ************************* *****Importance: medium batch.size 16384 client.dns.lookup use_all_dns_ips client.id "" connections.max.idle.ms 540000 (9 minutes) delivery.timeout.ms 120000 (2 minutes) linger.ms 0 max.block.ms 60000 (1 minute) max.request.size 1048576 partitioner.class null partitioner.ignore.keys false receive.buffer.bytes 32768 (32 kibibytes) request.timeout.ms 30000 (30 seconds) sasl.client.callback.handler.class null sasl.jaas.config null sasl.kerberos.service.name null sasl.login.callback.handler.class null sasl.login.class null sasl.mechanism GSSAPI sasl.oauthbearer.jwks.endpoint.url null sasl.oauthbearer.token.endpoint.url null security.protocol PLAINTEXT send.buffer.bytes 131072 (128 kibibytes) socket.connection.setup.timeout.max.ms 30000 (30 seconds) socket.connection.setup.timeout.ms 10000 (10 seconds) ssl.enabled.protocols TLSv1.2,TLSv1.3 ssl.keystore.type JKS ssl.protocol TLSv1.3 ssl.provider null ssl.truststore.type JKS ************************* *****Importance: low acks all enable.idempotence true interceptor.classes "" max.in.flight.requests.per.connection 5 metadata.max.age.ms 300000 (5 minutes) metadata.max.idle.ms 300000 (5 minutes) metric.reporters "" metrics.num.samples 2 metrics.recording.level INFO metrics.sample.window.ms 30000 (30 seconds) partitioner.adaptive.partitioning.enable true partitioner.availability.timeout.ms 0 reconnect.backoff.max.ms 1000 (1 second) reconnect.backoff.ms 50 retry.backoff.ms 100 sasl.kerberos.kinit.cmd /usr/bin/kinit sasl.kerberos.min.time.before.relogin 60000 sasl.kerberos.ticket.renew.jitter 0.05 sasl.kerberos.ticket.renew.window.factor 0.8 sasl.login.connect.timeout.ms null sasl.login.read.timeout.ms null sasl.login.refresh.buffer.seconds 300 sasl.login.refresh.min.period.seconds 60 sasl.login.refresh.window.factor 0.8 sasl.login.refresh.window.jitter 0.05 sasl.login.retry.backoff.max.ms 10000 (10 seconds) sasl.login.retry.backoff.ms 100 sasl.oauthbearer.clock.skew.seconds 30 sasl.oauthbearer.expected.audience null sasl.oauthbearer.expected.issuer null sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour) sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds) sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100 sasl.oauthbearer.scope.claim.name scope sasl.oauthbearer.sub.claim.name sub security.providers null ssl.cipher.suites null ssl.endpoint.identification.algorithm https ssl.engine.factory.class null ssl.keymanager.algorithm SunX509 ssl.secure.random.implementation null ssl.trustmanager.algorithm PKIX transaction.timeout.ms 60000 (1 minute) transactional.id null 2.4 Consumer配置 ************************* *****Importance: high key.deserializer value.deserializer bootstrap.servers "" fetch.min.bytes 1 group.id null heartbeat.interval.ms 3000 (3 seconds) max.partition.fetch.bytes 1048576 (1 mebibyte) session.timeout.ms 45000 (45 seconds) ssl.key.password null ssl.keystore.certificate.chain null ssl.keystore.key null ssl.keystore.location null ssl.keystore.password null ssl.truststore.certificates null ssl.truststore.location null ssl.truststore.password null ************************* *****Importance: medium allow.auto.create.topics true auto.offset.reset latest client.dns.lookup use_all_dns_ips connections.max.idle.ms 60000 (1 minute) enable.auto.commit true exclude.internal.topics true fetch.max.bytes 52428800 (50 mebibytes) group.instance.id null isolation.level read_uncommitted max.poll.interval.ms 300000 (5 minutes) max.poll.records 500 partition.assignment.strategy receive.buffer.bytes 65536 (64 kibibytes) request.timeout.ms 30000 (30 seconds) sasl.client.callback.handler.class null sasl.jaas.config null sasl.kerberos.service.name null sasl.login.callback.handler.class null sasl.login.class null sasl.mechanism GSSAPI sasl.oauthbearer.jwks.endpoint.url null sasl.oauthbearer.token.endpoint.url null security.protocol PLAINTEXT send.buffer.bytes 131072 (128 kibibytes) socket.connection.setup.timeout.max.ms 30000 (30 seconds) socket.connection.setup.timeout.ms 10000 (10 seconds) ssl.enabled.protocols TLSv1.2,TLSv1.3 ssl.keystore.type JKS ssl.protocol TLSv1.3 ssl.provider null ssl.truststore.type JKS ************************* *****Importance: low auto.commit.interval.ms 5000 (5 seconds) check.crcs true client.id "" client.rack "" fetch.max.wait.ms 500 interceptor.classes "" metadata.max.age.ms 300000 (5 minutes) metric.reporters "" metrics.num.samples 2 metrics.recording.level INFO metrics.sample.window.ms 30000 (30 seconds) reconnect.backoff.max.ms 1000 (1 second) reconnect.backoff.ms 50 retry.backoff.ms 100 sasl.kerberos.kinit.cmd /usr/bin/kinit sasl.kerberos.min.time.before.relogin 60000 sasl.kerberos.ticket.renew.jitter 0.05 sasl.kerberos.ticket.renew.window.factor 0.8 sasl.login.connect.timeout.ms null sasl.login.read.timeout.ms null sasl.login.refresh.buffer.seconds 300 sasl.login.refresh.min.period.seconds 60 sasl.login.refresh.window.factor 0.8 sasl.login.refresh.window.jitter 0.05 sasl.login.retry.backoff.max.ms 10000 (10 seconds) sasl.login.retry.backoff.ms 100 sasl.oauthbearer.clock.skew.seconds 30 sasl.oauthbearer.expected.audience null sasl.oauthbearer.expected.issuer null sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour) sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds) sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100 sasl.oauthbearer.scope.claim.name scope sasl.oauthbearer.sub.claim.name sub security.providers null ssl.cipher.suites null ssl.endpoint.identification.algorithm https ssl.engine.factory.class null ssl.keymanager.algorithm SunX509 ssl.secure.random.implementation null ssl.trustmanager.algorithm PKIX 2.5 Kafka Connect配置 2.6 Kafka Streams配置 2.7 AdminClient配置 3 KafkaOffsetMonitor

KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然。

KafkaOffsetMonitor,它是由Kafka开源社区提供的一款Web管理界面,是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,通过KafkaOffsetMonitor,我们可以很直观的知道,每个Partition的Message是否消费掉,有没有阻塞等。

通过web界面,可以方便的得知以下信息: (1)对Consumer的消费监控,并列出每个Consumer的Offset数据。 (2)保护消费者组列表信息。 (3)每个Topic的所有Partition列表包含:Topic、Pid、Offset、LogSize、Lag以及Owner等。 (4)浏览查阅Topic的历史消费信息。

3.1 下载

KafkaOffsetMonitor托管在Github上,可以通过Github下载。 下载地址:https://github.com/gunsid/KafkaOffsetMonitor.git。

在这里插入图片描述

3.2 启动

将下载下来的KafkaOffsetMonitor jar包上传到linux上,可以新建一个目录KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.1.jar进入到KafkaMonitor目录下,通过java编译命令来运行这个jar包:

java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days

如果没有指定端口,则默认会开启一个随机端口。

zk :zookeeper主机地址,如果有多个,用逗号隔开 port :应用程序端口 refresh :应用程序在数据库中刷新和存储点的频率 retain :在db中保留多长时间 dbName :保存的数据库文件名,默认为offsetapp

编辑启动脚本 #vi kafka-monitor-start.sh #chmod a+x kafka-monitor-start.sh

java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8089 \ --zk 10.23.241.202:2181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;

#nohup /root/mm/kafka-monitor-start.sh & 启动 #netstat -lnp | grep 8089 查看端口占用情况

3.3 web UI

在游览器中输入:http://ip:port即可以查看KafkaOffsetMonitor Web UI。 http://10.23.241.202:8089/。 在这里插入图片描述



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3