flume |
您所在的位置:网站首页 › flume采集数据到kafka的具体流程 › flume |
目录 一、Zookeeper的安装 1、上传zookeeper-3.4.14.tar.gz到服务器,解压到/usr/local 2、修改Zookeeper保存数据的目录,dataDir 3、编辑/etc/profile,使配置生效 4、启动Zookeeper,确认zookeeper的状态 二、Kafka的安装 1、上传kafka_2.12-1.0.2.tgz到服务器,解压到/usr/local 2、环境变量配置并生效 第一步:在kafka目录下 vi /etc/profile 第二步:添加以下内容 3、配置/usr/local/kafka/config/中的server.properties文件 第一步:修改配置文件:vi /etc/profile 第二步:配置kafka连接zookeeper的地址以及配置kafka在zookeeper上的根目录 第三步:配置kafka存储持久化数据目录(持久化用户操作的存储位置,存放在/tmp下不合理) 指定存储位置 第四步:创建上述持久化数据目录 4、启动kafka 第一步:进入到kafka的bin目录下 第二步:执行如下命令 5、重新开一个窗口,查看zookeeper的节点。(使用命令zkCli.sh) 6、此时kafka是前台模式启动,要停止,使用CTRL+C。 三、安装完之后的基本操作 四、生产与消费 五、flume的安装及使用 Flume实战 需求一:从指定网络端口采集数据输出到控制台 需求二:监控一个文件实时采集新增的数据输出到控制台 需求三:将A服务器上的日志实时采集到B服务器 一、Zookeeper的安装链接: https://pan.baidu.com/s/1HsAlRY5R3tkEpp-1mACWFg 提取码: k6pq 1、上传zookeeper-3.4.14.tar.gz到服务器,解压到/usr/local(1)解压 tar -zxf zookeeper-3.4.14.tar.gz -C /usr/local(2)将zookeeper-3.4.14重命名为zookeeper mv zookeeper-3.4.14 zookeeper 2、修改Zookeeper保存数据的目录,dataDir# 进入conf配置目录 cd /usr/local/zookeeper/conf# 复制zoo_sample.cfg命名为zoo.cfg cp zoo_sample.cfg zoo.cfg# 编辑zoo.cfg文件 vim zoo.cfg dataDir=/var/dabing/zookeeper/data设置环境变量ZOO_LOG_DIR,指定zookeeper保存日志的位置; ZOOKEEPER_PREFIX指向zookeeper的解压目录; 将zookeeper的bin目录添加到PATH中。 添加以下内容: 最后更新文件配置: source /etc/profile 4、启动Zookeeper,确认zookeeper的状态启动zookeeper: zkServer.sh start查看zookeeper状态: zkServer.sh statuszk数据和日志存储目录: cd /var/dabing/zookeeper/到此,zookeeper安装完成。 二、Kafka的安装链接: https://pan.baidu.com/s/1uFbGo6CeL2sO6KA62gS0uQ 提取码: 39ah 1、上传kafka_2.12-1.0.2.tgz到服务器,解压到/usr/local Kafka连接zookeeper的地址,此处使用本地启动的zookeeper实例 连接地址是localhost:2181 后面的myKafka是kafka在zookeeper中的根节点路径 第三步:配置kafka存储持久化数据目录(持久化用户操作的存储位置,存放在/tmp下不合理) 指定存储位置 启动成功,可以看到控制台输出的最后一行的started状态:此时kafka安装成功。 如果要后台启动,使用命令: kafka-server-start.sh -daemon $KAFKA/config/server.properties查看kafka的后台进程: ps aux | grep kafka 停止后台运行的kafka: kafka-server-stop.sh 三、安装完之后的基本操作1、查看zk状态 (在kafka目录下) zkServer.sh status2、启动kafka (在kafka的bin目录下) kafka-server-start.sh -daemon $KAFKA/config/ server.properties3、查看kafka进程 (在kafka的bin目录下) ps aux | grep kafka4、关闭kafka (在kafka的bin目录下) Kafka-server-stop.sh5、查看zookeeper节点 (在zookeeper的bin目录下) zkCli.sh 四、生产与消费# 列出现有的主题 kafka-topics.sh --list --zookeeper localhost:2181/myKafka# 创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test --partitions 1 --replication-factor 1# 查看分区信息 kafka-topics.sh --zookeeper localhost:2181/myKafka --list# 查看指定主题的详细信息 kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test# 删除指定主题 kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_test发送消息: kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test2消费消息: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test2 --from-beginning详细步骤 SpringBoot Kafka 整合使用 SpringBoot集成kafka全面实战 五、flume的安装及使用链接: https://pan.baidu.com/s/1sWrKiYpDZqZrg-9bK3yPPA 提取码: 562y Flume实战 需求一:从指定网络端口采集数据输出到控制台第一步:Flume部署 (首先在flume/conf目录下创建一个文件example.conf 进行编辑vi example.conf。 加入以下内容: a1.sources = r1 a1.sinks = k1 a1.channels = c1# 描述/配置源 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444# 描述接收器 a1.sinks.k1.type = logger# 使用一个通道缓冲内存中的事件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 将source和sink绑定到通道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1第二步:启动flume .bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console或 bin/flume-ng agent -n a1 -c conf -f conf/example.conf -Dflume.root.logger=INFO,console第三步:重新开一台控制台(前提是要先安装telnet) 指令 telnet localhost 44444结果: 查看占用端口的进程: (44444为端口号) netstat -nltp | grep 44444杀死进程: Kill -9 44444 需求二:监控一个文件实时采集新增的数据输出到控制台第一步:首先在flume/conf目录下创建一个文件exec-memory-logger.conf 进行编辑vi exec-memory-logger.conf。 加入以下内容: a1.sources = r1 a1.sinks = k1 a1.channels = c1# 描述接收器 a1.sinks.k1.type = logger a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/flume/data.log a1.sources.r1.shell = /bin/sh -c a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 使用一个通道缓冲内存中的事件 a1.channels.c1.type = memory# 将source和sink绑定到通道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1第二步:在flume目录下创建一个空文件(touch data.log) 第三步:重开一个控制台 在flume目录下使用echo helloworld >> data.log 结果: 在flume/conf目录下创建两个空文件 touch avro-memory-logger.conf touch exec.confvi编辑两个文件 编辑 vi avro-memory-logger.conf exec-memory-avro.sources = exec-source exec-memory-avro.sinks = avro-sink exec-memory-avro.channels = memory-channel# 描述接收器 exec-memory-avro.conf.sinks.avro-sink.type = avro exec-memory-avro.conf.sinks.avro-sink.hostname = liufang exec-memory-avro.conf.sinks.avro-sink.port = 44444 exec-memory-avro.conf.sources.exec-source.type = exec exec-memory-avro.conf.sources.exec-source.command = tail -F /usr/local/flume/data.log exec-memory-avro.sources.exec-source.shell = /bin/sh -c# 使用一个通道缓冲内存中的事件 exec-memory-avro.channels.memory-channel.type = memory# 将source和sink绑定到通道 exec-memory-avro.sources.exec-source.channels = memory-channel exec-memory-avro.sinks.avro-sink.channel = memory-channel编辑vi exec.conf avro-memory-logger.sources = avro-source avro-memory-logger.sinks = logger-sink avro-memory-logger.channels = memory-channel# 描述接收器 avro-memory-logger.sinks.logger-sink.type = avro avro-memory-logger.sinks.logger-sink.bind = liufang avro-memory-logger.sinks.logger-sink.post = 44444 avro-memory-logger.sources.avro-source.type = logger# 使用一个通道缓冲内存中的事件 avro-memory-logger.channels.memory-channel.type = memory# 将source和sink绑定到通道 avro-memory-logger.sources.avro-source.channels = memory-channel avro-memory-logger.conf.sinks.logger-sink.channel = memory-channel重新开一台控制台: 先启动avro-memory-logger.conf ./bin/flume-ng agent --conf conf --conf-file conf/avro-memory-logger.conf --name a1 -Dflume.root.logger=INFO,console然后启动 exec.conf ./bin/flume-ng agent --conf conf --conf-file conf/exec.conf --name a1 -Dflume.root.logger=INFO,console结果: |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |