kafka的安装和使用(详细版) |
您所在的位置:网站首页 › kafka实际案例 › kafka的安装和使用(详细版) |
原创地址: https://www.cnblogs.com/lilixin/p/5775877.html
Kafka安装与使用
下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz 安装以及启动kafka 步骤1:安装kafka $ tar -xzf kafka_2.10-0.8.1.1.tgz $ cd kafka_2.10-0.8.1.1.tgz步骤2:配置server.properties 配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法) 进入kafka安装工程根目录编辑 vim config/server.properties修改属性 zookeeper.connect=ip:2181,ip2: 2181 步骤3:server.properties配置说明 kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect kafka server端config/server.properties参数说明和解释如下: (参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831) #实际使用案例 这里211上面的kafka 配置文件 ![]() ![]() 步骤4: 启动kafka (先启动zookeeper $: bin/zkServer.sh start config/zookeeper.properties &) cd kafka-0.8.1 $ bin/kafka-server-start.sh -daemon config/server.properties &(实验时,需要启动至少两个broker bin/kafka-server-start.sh -daemon config/server-1.properties &) 步骤5:创建topic $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test步骤6:验证topic是否创建成功 $ bin/kafka-topics.sh --list --zookeeper localhost:2181localhost为zookeeper地址 topic描述: bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test
![]() ![]() 启动报错 Could not reserve enough space for object heap 原因及解决办法: 查看kafka-server-start.sh配置文件,发现有heap设置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 更改这里的内存为256(因为测试机内存总共才1G ,所以报错)
步骤7:发送消息 发送一些消息验证,在console模式下,启动producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test(此处localhost改为本机ip,否则报错,I don’t know why) 消息案例: {"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"会员"}
步骤8:启动一个consumer $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除 bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181
配置kafka集群模式,需要由多个broker组成
和单机环境一样,只是需要修改下broker 的配置文件而已。 1、将单机版的kafka 目录复制到其他几台电脑上。 2、修改每台电脑上的kafka 目录下的server.properties 文件。 broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。 3、启动每台电脑上的kafka 即可。 本机配置伪分布式 首先为每个节点编写配置文件: > cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties 在拷贝出的新文件中添加以下参数: config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2现在启动另外两个节点: > bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ... 创建一个拥有3个副本的topic: > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic 运行“"describe topics”命令知道每个节点的信息 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 leader:负责处理消息的读和写,leader是从所有节点中随机选择的. replicas:列出了所有的副本节点,不管节点是否在服务中. isr:是正在服务中的节点. 搭建Kafka开发环境 1 在pom.xml中引入kafka依赖jar包 ![]() ![]() 2.属性文件 kafka.properties ![]() ![]() 在spring配置文件中引入此properties文件 ![]() ![]() 3.定义收信人 ![]() ![]() 4. spring中定义一个消息处理器(需要实现vkoConsumer)
5.消息生产者项目引入producer
|
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |