kafka 分组消费topic |
您所在的位置:网站首页 › kafka消息分组未分类 › kafka 分组消费topic |
![]() 十万个为什么 集群操作 Topic操作 生产者操作 消费者操作 ![]() 集群操作 ![]() 1.启动集群 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties2.停止集群 kafka-server-stop.sh /usr/local/kafka/config/server.properties![]() Topic操作 ![]() 1.创建Topic kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic wolf注意:副本数不可大于broker的数量。 Replication factor: 6 larger than available brokers: 3. 2.删除Topic kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic wolf注意:此处的删除操作只是将该topic标记为删除,并未真正的删除,且要创建一个同名的topic也不会成功。 Error while executing topic command : Topic 'wolf' already exists 如果打算删除重新创建,可以先修改 kafka/config/server.properties , 在文件的最后加入配置 delete.topic.enable=true 则此时执行删除命令将直接删除 3.修改Topic (1)增加分区数 kafka-topics.sh --bootstrap-server 127.0.0.1:2181 --alter --topic my_topic_name --partitions 40(2)修改配置 kafka-configs.sh --bootstrap-server 127.0.0.1:9092--entity-type topics --entity-name my_topic_name --alter --add-config x=y(3)修改过期时间 全局配置【server.properties 】 log.retention.hours=72log.cleanup.policy=delete单独配置 kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-name wolf --entity-type topics --add-config retention.ms=86400000查看设置: kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --describe --entity-name wolf --entity-type topics(4)修改副本因子 首先创建一个json文件,指定相关的配置 vim custom_replication.json{ "partitions": [{ "topic": "wolf", "partition": 0, "replicas": [1] }, { "topic": "wolf", "partition": 1, "replicas": [4] }, { "topic": "wolf", "partition": 2, "replicas":[2] } ], "version": 1}注意: partition:分片的编号 replicas:指定分片所分布broker的,broker id列表 执行修改副本 kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file /Users/zhaoqiang/Downloads/aa.json --execute //以不超多500M/s的速度进行数据迁移【此处的单位是B/s】kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file /Users/zhaoqiang/Downloads/aa.json --execute --throttle 50000000//--verify 用于分区分配的状态4.查询Topic (0)罗列所有Topic kafka-topics.sh --list --zookeeper 127.0.0.1:2181(1)查看具体topic详情【其中的数字是brokerId】 kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test(2)列出与集群的默认配置不同的topic kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topics-with-overrides(3)列出包含不同步副本的topic kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --under-replicated-partitions(4)列出leader不可用的副本 kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --unavailable-partitions(5)修改topic分区数 kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --partitions 4 --topic wolf(6)查看选举失败的Topic 分区 kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe|grep "Leader: -1"![]() 生产者操作 ![]() 1、生产消息 kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic fusion_center_monitor_metric![]() 消费者操作 ![]() 1、查看消息 (1)查看指定消费者分组消费过指定topic的消息【实时数据】 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic fox(2)查看指定消费者分组消费过指定topic的消息【从第一条数据开始】 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic log_mryx_intelligent_promotion --from-beginning --group wolf2、查看消息进度 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups(3)将消费者组的偏移量导出到 offsets.txt kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect 127.0.0.1:2181--group gid --output-file offsets.txt3、消费组操作 (1)查看所有的消费者组 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list(2)消费者组描述 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group gid(3)消费者组中所有活跃成员的列表 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group gid --memberskafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group gid --members --all-groups(4)消费者组中所有活跃成员及成员所对应的分区列表 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups --members --verbose(5)消费者组状态 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups --state(6)删除消费者组 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group gid(7)重置消费者组偏移量 kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest --reset-offsets 需指定Topic --all-topics或--topic 执行选项 --(默认)以显示要重置的偏移量。 --execute:执行--reset-offsets过程。 --export:将结果导出为CSV格式。 执行方案--to-datetime :将偏移量重置为与datetime的偏移量。格式:“ YYYY-MM-DDTHH:mm:SS.sss”--to-earliest:将偏移量重置为最早的偏移量。--to-latest:将偏移量重置为最新偏移量。--shift-by :重置偏移,将当前偏移偏移“ n”,其中“ n”可以为正或负。--from-file:将偏移量重置为CSV文件中定义的值。--to-current:将偏移量重置为当前偏移量。--by-duration :将偏移量重置为从当前时间戳记的持续时间偏移量。格式:“ PnDTnHnMnS”--to-offset:将偏移量重置为特定偏移量。请注意,超出范围的偏移量将调整为可用的偏移量结束。例如,如果偏移量结束为10,偏移量请求为15,则实际上将选择偏移量为10。 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |