Flume采集日志到Kafka

您所在的位置:网站首页 flume采集数据到datahub Flume采集日志到Kafka

Flume采集日志到Kafka

2024-03-04 17:06| 来源: 网络整理| 查看: 265

image.png 1.日志采集Flume安装

详情见:https://www.jianshu.com/p/a2590b997e8e?v=1698800791912

集群规划:

服务器hadoop101 服务器hadoop102 服务器hadoop103 Flume(采集日志) Flume Flume 2.项目经验之Flume组件选型

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势 TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。 Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。 Spooling Directory Source监控目录,支持断点续传。

(2)batchSize大小如何设置? 答:Event 1K左右时,500-1000合适(默认为100)

2)Channel 采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。 注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

3.日志采集Flume配置 image.png

Flume直接读log日志的数据,log日志的格式是app.yyyy-MM-dd.log。

2)Flume的具体配置如下: (1)在 /opt/module/flume 目录下创建 job 目录

[yobhel@hadoop101 flume]$ mkdir job

进入 job 目录,创建 file-flume-kafka.conf 文件

[yobhel@hadoop101 job]$ vim file-flume-kafka.conf

在文件配置如下内容

#为各组件命名 a1.sources = r1 a1.channels = c1 #描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/log/app.* a1.sources.r1.positionFile = /opt/module/data_mocker/log_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.ETLInterceptor$Builder #描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1

注意:com.yobhel.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4 Flume拦截器

https://github.com/Yobhel121/edu-flume-interceptor

1)需要先将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下面。

[yobhel@hadoop101 lib]$ ls | grep edu-flume-interceptor edu-flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

2)分发Flume到hadoop102、hadoop103

[yobhel@hadoop101 module]$ xsync flume/

3)在hadoop101上启动Flume

[yobhel@hadoop101 flume]$ bin/flume-ng agent --name a1 --conf-file job/file-flume-kafka.conf & 5.日志采集Flume启动停止脚本

1)在/home/yobhel/bin目录下创建脚本f1.sh

[yobhel@hadoop101 bin]$ vim f1.sh

在脚本中填写如下内容

#! /bin/bash case $1 in "start"){ for i in hadoop101 hadoop102 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &" done };; "stop"){ for i in hadoop101 hadoop102 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac 说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。 说明2:awk 默认分隔符为空格。 说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。 2)增加脚本执行权限 [yobhel@hadoop101 bin]$ chmod +x f1.sh

3)f1集群启动脚本

[yobhel@hadoop101 module]$ f1.sh start

4)f1集群停止脚本

[yobhel@hadoop101 module]$ f1.sh stop


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3