Flume采集日志到Kafka |
您所在的位置:网站首页 › flume采集数据到datahub › Flume采集日志到Kafka |
image.png
1.日志采集Flume安装
详情见:https://www.jianshu.com/p/a2590b997e8e?v=1698800791912 集群规划: 服务器hadoop101 服务器hadoop102 服务器hadoop103 Flume(采集日志) Flume Flume 2.项目经验之Flume组件选型1)Source (1)Taildir Source相比Exec Source、Spooling Directory Source的优势 TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。 Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。 Spooling Directory Source监控目录,支持断点续传。 (2)batchSize大小如何设置? 答:Event 1K左右时,500-1000合适(默认为100) 2)Channel 采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。 注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。 3.日志采集Flume配置 image.pngFlume直接读log日志的数据,log日志的格式是app.yyyy-MM-dd.log。 2)Flume的具体配置如下: (1)在 /opt/module/flume 目录下创建 job 目录 [yobhel@hadoop101 flume]$ mkdir job进入 job 目录,创建 file-flume-kafka.conf 文件 [yobhel@hadoop101 job]$ vim file-flume-kafka.conf在文件配置如下内容 #为各组件命名 a1.sources = r1 a1.channels = c1 #描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/log/app.* a1.sources.r1.positionFile = /opt/module/data_mocker/log_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.ETLInterceptor$Builder #描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1注意:com.yobhel.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。 4 Flume拦截器https://github.com/Yobhel121/edu-flume-interceptor 1)需要先将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下面。 [yobhel@hadoop101 lib]$ ls | grep edu-flume-interceptor edu-flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar2)分发Flume到hadoop102、hadoop103 [yobhel@hadoop101 module]$ xsync flume/3)在hadoop101上启动Flume [yobhel@hadoop101 flume]$ bin/flume-ng agent --name a1 --conf-file job/file-flume-kafka.conf & 5.日志采集Flume启动停止脚本1)在/home/yobhel/bin目录下创建脚本f1.sh [yobhel@hadoop101 bin]$ vim f1.sh在脚本中填写如下内容 #! /bin/bash case $1 in "start"){ for i in hadoop101 hadoop102 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &" done };; "stop"){ for i in hadoop101 hadoop102 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac 说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。 说明2:awk 默认分隔符为空格。 说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。 2)增加脚本执行权限 [yobhel@hadoop101 bin]$ chmod +x f1.sh3)f1集群启动脚本 [yobhel@hadoop101 module]$ f1.sh start4)f1集群停止脚本 [yobhel@hadoop101 module]$ f1.sh stop |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |