flume

您所在的位置:网站首页 flume采集数据到kafka的具体流程 flume

flume

2023-11-30 18:20| 来源: 网络整理| 查看: 265

目录

一、Zookeeper的安装

1、上传zookeeper-3.4.14.tar.gz到服务器,解压到/usr/local

2、修改Zookeeper保存数据的目录,dataDir

3、编辑/etc/profile,使配置生效

4、启动Zookeeper,确认zookeeper的状态

二、Kafka的安装

 1、上传kafka_2.12-1.0.2.tgz到服务器,解压到/usr/local

2、环境变量配置并生效

第一步:在kafka目录下 vi /etc/profile

第二步:添加以下内容

3、配置/usr/local/kafka/config/中的server.properties文件

第一步:修改配置文件:vi /etc/profile   

第二步:配置kafka连接zookeeper的地址以及配置kafka在zookeeper上的根目录

第三步:配置kafka存储持久化数据目录(持久化用户操作的存储位置,存放在/tmp下不合理)                                           指定存储位置

第四步:创建上述持久化数据目录

4、启动kafka 

第一步:进入到kafka的bin目录下

第二步:执行如下命令

5、重新开一个窗口,查看zookeeper的节点。(使用命令zkCli.sh)

6、此时kafka是前台模式启动,要停止,使用CTRL+C。

三、安装完之后的基本操作

四、生产与消费

五、flume的安装及使用

Flume实战

 需求一:从指定网络端口采集数据输出到控制台

 需求二:监控一个文件实时采集新增的数据输出到控制台

 需求三:将A服务器上的日志实时采集到B服务器

一、Zookeeper的安装

链接: https://pan.baidu.com/s/1HsAlRY5R3tkEpp-1mACWFg

提取码: k6pq

1、上传zookeeper-3.4.14.tar.gz到服务器,解压到/usr/local

(1)解压

tar -zxf zookeeper-3.4.14.tar.gz -C /usr/local

(2)将zookeeper-3.4.14重命名为zookeeper

mv zookeeper-3.4.14 zookeeper

2、修改Zookeeper保存数据的目录,dataDir

# 进入conf配置目录

 cd /usr/local/zookeeper/conf

# 复制zoo_sample.cfg命名为zoo.cfg                   

cp zoo_sample.cfg zoo.cfg

# 编辑zoo.cfg文件  

vim zoo.cfg dataDir=/var/dabing/zookeeper/data

 3、编辑/etc/profile,使配置生效

设置环境变量ZOO_LOG_DIR,指定zookeeper保存日志的位置;

ZOOKEEPER_PREFIX指向zookeeper的解压目录;

将zookeeper的bin目录添加到PATH中。

添加以下内容:

最后更新文件配置:

source /etc/profile

4、启动Zookeeper,确认zookeeper的状态

启动zookeeper:

zkServer.sh start

查看zookeeper状态:

zkServer.sh status

zk数据和日志存储目录:

cd /var/dabing/zookeeper/

到此,zookeeper安装完成。 

二、Kafka的安装

链接: https://pan.baidu.com/s/1uFbGo6CeL2sO6KA62gS0uQ

提取码: 39ah

 1、上传kafka_2.12-1.0.2.tgz到服务器,解压到/usr/local

2、环境变量配置并生效 第一步:在kafka目录下 vi /etc/profile

第二步:添加以下内容

3、配置/usr/local/kafka/config/中的server.properties文件

 

第一步:修改配置文件:vi /etc/profile   

第二步:配置kafka连接zookeeper的地址以及配置kafka在zookeeper上的根目录

Kafka连接zookeeper的地址,此处使用本地启动的zookeeper实例

连接地址是localhost:2181

后面的myKafka是kafka在zookeeper中的根节点路径

第三步:配置kafka存储持久化数据目录(持久化用户操作的存储位置,存放在/tmp下不合理)                                            指定存储位置

第四步:创建上述持久化数据目录

4、启动kafka  第一步:进入到kafka的bin目录下

第二步:执行如下命令 kafka-server-start.sh $KAFKA/config/server.properties

 

 启动成功,可以看到控制台输出的最后一行的started状态:此时kafka安装成功。

 

5、重新开一个窗口,查看zookeeper的节点。(使用命令zkCli.sh)

6、此时kafka是前台模式启动,要停止,使用CTRL+C。

如果要后台启动,使用命令: 

kafka-server-start.sh -daemon $KAFKA/config/server.properties

查看kafka的后台进程:

ps aux | grep kafka 

 

停止后台运行的kafka:

kafka-server-stop.sh

 三、安装完之后的基本操作

1、查看zk状态  (在kafka目录下)   

zkServer.sh status

2、启动kafka     (在kafka的bin目录下)              

kafka-server-start.sh -daemon $KAFKA/config/ server.properties

3、查看kafka进程  (在kafka的bin目录下) 

ps aux | grep kafka

4、关闭kafka (在kafka的bin目录下)

Kafka-server-stop.sh

5、查看zookeeper节点 (在zookeeper的bin目录下)

zkCli.sh

四、生产与消费

# 列出现有的主题

kafka-topics.sh --list --zookeeper localhost:2181/myKafka

# 创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test --partitions 1   --replication-factor 1

# 查看分区信息

kafka-topics.sh --zookeeper localhost:2181/myKafka --list

# 查看指定主题的详细信息

kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test 

# 删除指定主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_test 

发送消息:

kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test2

消费消息:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test2 --from-beginning

 详细步骤

SpringBoot Kafka 整合使用

SpringBoot集成kafka全面实战

五、flume的安装及使用

链接: https://pan.baidu.com/s/1sWrKiYpDZqZrg-9bK3yPPA

提取码: 562y

Flume实战    需求一:从指定网络端口采集数据输出到控制台

第一步:Flume部署

      (首先在flume/conf目录下创建一个文件example.conf 进行编辑vi example.conf。

加入以下内容:

a1.sources  =  r1 a1.sinks  =  k1 a1.channels  =  c1

# 描述/配置源

a1.sources.r1.type  =  netcat a1.sources.r1.bind  =  localhost a1.sources.r1.port  =  44444

# 描述接收器

a1.sinks.k1.type  =  logger

# 使用一个通道缓冲内存中的事件

a1.channels.c1.type  =  memory a1.channels.c1.capacity  =  1000 a1.channels.c1.transactionCapacity  =  100

# 将source和sink绑定到通道

a1.sources.r1.channels  =  c1 a1.sinks.k1.channel  =  c1

第二步:启动flume  

.bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

bin/flume-ng agent -n a1 -c conf -f conf/example.conf -Dflume.root.logger=INFO,console

第三步:重新开一台控制台(前提是要先安装telnet)

 指令  

telnet localhost 44444

结果:

 查看占用端口的进程: (44444为端口号)

netstat -nltp | grep 44444

杀死进程:

Kill -9 44444 

 需求二:监控一个文件实时采集新增的数据输出到控制台

第一步:首先在flume/conf目录下创建一个文件exec-memory-logger.conf 进行编辑vi exec-memory-logger.conf。

加入以下内容:

a1.sources  =  r1 a1.sinks  =  k1 a1.channels = c1

# 描述接收器

a1.sinks.k1.type  =  logger a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/flume/data.log a1.sources.r1.shell = /bin/sh -c a1.channels.c1.capacity  =  1000 a1.channels.c1.transactionCapacity  =  100

# 使用一个通道缓冲内存中的事件

a1.channels.c1.type  =  memory

# 将source和sink绑定到通道

a1.sources.r1.channels  =  c1 a1.sinks.k1.channel  =  c1

第二步:在flume目录下创建一个空文件(touch data.log)

第三步:重开一个控制台  在flume目录下使用echo helloworld >> data.log

结果:

需求三:将A服务器上的日志实时采集到B服务器

在flume/conf目录下创建两个空文件               

touch avro-memory-logger.conf   touch exec.conf

vi编辑两个文件

编辑

vi avro-memory-logger.conf exec-memory-avro.sources  =  exec-source exec-memory-avro.sinks  =  avro-sink exec-memory-avro.channels = memory-channel

# 描述接收器

exec-memory-avro.conf.sinks.avro-sink.type  =  avro exec-memory-avro.conf.sinks.avro-sink.hostname  =  liufang exec-memory-avro.conf.sinks.avro-sink.port  =  44444 exec-memory-avro.conf.sources.exec-source.type = exec exec-memory-avro.conf.sources.exec-source.command = tail -F /usr/local/flume/data.log exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# 使用一个通道缓冲内存中的事件

exec-memory-avro.channels.memory-channel.type  =  memory

# 将source和sink绑定到通道

exec-memory-avro.sources.exec-source.channels  = memory-channel exec-memory-avro.sinks.avro-sink.channel  = memory-channel

编辑vi exec.conf

avro-memory-logger.sources  =  avro-source avro-memory-logger.sinks  =  logger-sink avro-memory-logger.channels = memory-channel

# 描述接收器

avro-memory-logger.sinks.logger-sink.type  =  avro avro-memory-logger.sinks.logger-sink.bind  =  liufang avro-memory-logger.sinks.logger-sink.post  =  44444 avro-memory-logger.sources.avro-source.type = logger

# 使用一个通道缓冲内存中的事件

avro-memory-logger.channels.memory-channel.type  =  memory

# 将source和sink绑定到通道

avro-memory-logger.sources.avro-source.channels  = memory-channel avro-memory-logger.conf.sinks.logger-sink.channel  = memory-channel

重新开一台控制台: 先启动avro-memory-logger.conf

./bin/flume-ng agent --conf conf --conf-file conf/avro-memory-logger.conf --name a1 -Dflume.root.logger=INFO,console

然后启动  exec.conf

./bin/flume-ng agent --conf conf --conf-file conf/exec.conf --name a1 -Dflume.root.logger=INFO,console

结果:



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3