Flume案例五:实时监控多个目录下多个追加文件(Taildir Source)

您所在的位置:网站首页 cache技术为什么不适用于实时处理文件 Flume案例五:实时监控多个目录下多个追加文件(Taildir Source)

Flume案例五:实时监控多个目录下多个追加文件(Taildir Source)

2024-07-12 08:29| 来源: 网络整理| 查看: 265

本文接上篇博客:Flume介绍、安装、使用案例、自定义Source/Sink、监控 Flume 版本:1.9.0 本文hdfs sink,需 Hadoop 支持,Hadoop相关内容,请参考:Hadoop专栏

1.实时监控多个目录下多个追加文件

选型:taildir source + memory channel + hdfs sink

文档参考:  taildir source:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#taildir-source  memory channel:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#memory-channel  hdfs sink:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#hdfs-sink   提示:   Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooling Directory Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而 Taildir Source 既能够实现断点续传,又可以保证数据不丢失(通过positionFile属性记录读取位置),还能够进行实时监控,集两者优点于一身,更推荐使用Taildir Source。

2.需求分析

在这里插入图片描述

3.flume配置

flume-taildir-hdfs.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/position/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/testdir/test.log a1.sources.r1.filegroups.f2 = /opt/module/testdir2/file.* # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/taildir/%Y-%m-%d/%H a1.sinks.k1.hdfs.filePrefix = taildir # 是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 是否按照时间滚动文件夹 a1.sinks.k1.hdfs.round = true # 多少时间单位创建一个新的文件夹 a1.sinks.k1.hdfs.roundValue = 1 # 重新定义时间单位 a1.sinks.k1.hdfs.roundUnit = hour # 积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 1000 # 多久生成一个新的文件(seconds) a1.sinks.k1.hdfs.rollInterval = 30 # 设置每个文件的滚动大小 a1.sinks.k1.hdfs.rollSize = 134217700 # 文件的滚动与 Event 数量无关 a1.sinks.k1.hdfs.rollCount = 0 # 设置文件类型,可支持压缩(不加该配置的话,Flume写入HDFS的文件会出现SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable) a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4.启动命令 bin/flume-ng agent -c conf -n a1 -f job/flume-taildir-hdfs.conf 5.异常处理

写入HDFS,报如下错误:java.lang.NoSuchMethodError:com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

  这是因为hadoop目录下得guava版本和flume下的guava版本的问题。进入 flume/lib 目录下,将 guava-11.0.2.jar 包移除即可

6.测试图示

Taildir Source 实时监听 testdir/test.log 和 /testdir2/file.*后缀的所有文件。

echo 方式追加数据至 test.log 文件,模拟实时日志;(可代替 exec source)提前准备好测试文件file.txt,测试 mv 移动文件到 testdir2 目录下;(可代替 spooling directory source)testdir2 文件夹下,创建file.tmp 文件,echo 方式追加数据至 file.tmp 文件。  

flume 端能够正常接收到新增的文件,通过 hdfs sink方式,将文件内容写出到 /flume/taildir 目录下,测试数据如图所示: 在这里插入图片描述

测试结果,如图所示: 在这里插入图片描述

博主写作不易,加个关注呗

求关注、求点赞,加个关注不迷路 ヾ(◍°∇°◍)ノ゙

我不能保证所写的内容都正确,但是可以保证不复制、不粘贴。保证每一句话、每一行代码都是亲手敲过的,错误也请指出,望轻喷 Thanks♪(・ω・)ノ



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3