kafka 命令行命令大全

您所在的位置:网站首页 kafka创建消费组命令 kafka 命令行命令大全

kafka 命令行命令大全

2023-08-24 10:04| 来源: 网络整理| 查看: 265

kafka 脚本 connect-distributed.sh connect-mirror-maker.sh connect-standalone.sh kafka-acls.sh kafka-broker-api-versions.sh kafka-configs.sh kafka-console-consumer.sh kafka-console-producer.sh kafka-consumer-groups.sh kafka-consumer-perf-test.sh kafka-delegation-tokens.sh kafka-delete-records.sh kafka-dump-log.sh kafka-leader-election.sh kafka-log-dirs.sh kafka-mirror-maker.sh kafka-preferred-replica-election.sh kafka-producer-perf-test.sh kafka-reassign-partitions.sh kafka-replica-verification.sh kafka-run-class.sh kafka-server-start.sh kafka-server-stop.sh kafka-streams-application-reset.sh kafka-topics.sh kafka-verifiable-consumer.sh kafka-verifiable-producer.sh trogdor.sh windows zookeeper-security-migration.sh zookeeper-server-start.sh zookeeper-server-stop.sh zookeeper-shell.sh 集群管理 # 启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties & # 停止zookeeper bin/zookeeper-server-stop.sh # 前台启动broker Ctrl + C 关闭 bin/kafka-server-start.sh /server.properties # 后台启动broker bin/kafka-server-start.sh -daemon /server.properties # 关闭broker bin/kafka-server-stop.sh topic相关 # 创建topic(4个分区,2个副本) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test # kafka版本 >= 2.2 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test --create:指定创建topic动作 --topic:指定新建topic的名称 --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样 --partitions:指定当前创建的kafka分区数量,默认为1个 --replication-factor:指定每个分区的复制因子个数,默认1个 # 分区扩容,注意:分区数量只能增加,不能减少 # kafka版本 < 2.2 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2 # kafka版本 >= 2.2 bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2 # 删除topic bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test # 查询topic列表 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list # 查询topic列表(支持0.9版本+) bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 查看所有topic的详细信息 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe # 查询topic详情 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicname 消费者组

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap–server和zookeeper参数

# 新消费者列表查询(支持0.9版本+) bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list # 消费者列表查询(支持0.10版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的) bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test # 显示某个新消费组的消费详情(0.9版本 - 0.10.1.0 之前) bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group my-group ## 显示某个消费组的消费详情(0.10.1.0版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group # 重设消费者组位移 # 最早处 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute # 最新处 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute # 某个位置 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute # 调整到某个时间之后的最早位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000 # 删除消费者组 bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete --group groupname

显示某个消费组的消费详情 显示某个消费组的消费详情 各字段含义如下

TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtopic名字分区id当前已消费的条数总条数未消费的条数消费id主机ip客户端id 发送和消费 # 生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 消费者,其中"--from-beginning"为可选参数,表示要从头消费消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning # 指定groupid bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer-property group.id=old-consumer-group # 指定分区 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --partition 0 # 新生产者(支持0.9版本+) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties # 新消费者(支持0.9版本+) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties # kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等) bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName # 高级点的用法 bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10 切换leader # kafka版本 increase-replication-factor.json


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3