非结构数据采集工具

您所在的位置:网站首页 flume采集数据会丢失吗 非结构数据采集工具

非结构数据采集工具

2023-10-13 05:20| 来源: 网络整理| 查看: 265

1、Flume简介

  Apache Flume是一种分布式、可靠和可用的系统,用于高效收集、聚合,以及将大量日志数据从许多不同的来源移动到集中式数据存储上。使用Apache Flume不仅限于日志数据的聚合。由于数据源是可定制的,因此可以使用Flume来传输大量的事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和其他数据源。

  Flume使用两个独立的事务负责从Source到Channel及从Channel到Sink的事件传递。Channel中的File Channel具有持久性,事件写入File Channel后,即使Agent重新启动,事件也不会丢失。Flume中还提供了一种Memory Channel的方式,但它不具有持久存储的能力,但是与File Channel相比,MemoryChannel的优点是具有较高的吞吐量。

  Flume的主要组件有Event、Client、Agent、Source、Channel和Sink等。

事件Event Event是Flume数据传输的基本单元,Flume以事件的形式将数据从源头传送到最终目的。代理Agent Agent是Flume流的基础部分,一个Agent包含Source、Channel、Sink和其他组件,它基于这些组件把Event从一个节点传输到另一个节点或最终目的地上,由Flume为这些组件提供配置、生命周期管理和监控支持。Source Source的主要职责是接收Event,并将Event批量地放到一个或者多个Channel中。Spooling Directory Source是通过读取硬盘上需要被收集数据的文件到spooling目录来获取数据,然后再将数据发送到Channel。该Source会监控指定的目录来发现新文件并解析新文件。在给定的文件已被读完之后,它被重命名为指示完成(或可选地删除)。Exec源在启动时运行给定的UNIX命令,并期望该进程在标准输出上连续生成数据。如果进程由于任何原因退出,则源也将退出并且不会继续产生数据。Channel Channel位于Source和Sink之间,用于缓存Event,当Sink成功将Event发送到下一个Agent或最终目的处之后,会将Event从Channel上移除。Memory Channel是指Events被存储在已配置最大容量的内存队列中,因此它不具有持久存储能力。File Channel具有持久性,只要事件被写入Channel,即使代理重新启动,事件也不会丢失,能保障数据的完整性。Sink Sink的主要职责是将Event传输到下一个Agent或最终目的处,成功传输完成后将Event从Channel中移除。Sink主要分为两大类:File Roll Sink和Hdfs Sink。File Roll Sink是指将事件写入本地文件系统中,首先我们要在本地文件系统中创建一个缓冲目录。HDFS Sink是指将事件写入Hadoop分布式文件系统(HDFS)。它可以根据经过的时间、数据大小或事件数量定期滚动文件,也就是关闭当前文件并创建新文件。其他 Interceptor组件主要作用于Source,可以按照特定的顺序对Events进行装饰或过滤。Sink Group允许用户将多个Sink组合在一起,Sink Processor则能够通过组中的Sink切换来实现负载均衡,也可以在一个Sink出现故障时切换转到另一个Sink。 2、Flume安装

1、下载

wget https://mirrors.bfsu.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

2、解压

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C ../servers/

3、设置环境变量

vim /etc/profile

增加如下配置:

export FLUME_HOME=/export/servers/apache-flume-1.9.0-bin export PATH=:$FLUME_HOME/bin:$PATH

刷新配置

source /etc/profile 3、Flume实现本地文件读取和写入

1、创建文件目录

#作为数据源 mkdir /export/source #作为输出目录 mkdir /export/dist

2、创建配置文件 配置文件是实现Flume数据采集的核心。这里主要配置采集源和输出目录等信息,如下所示:

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 Source a1.sources.r1.type =spooldir a1.sources.r1.spoolDir=/export/source #配置Sink a1.sinks.k1.type =file_roll a1.sinks.k1.sink.directory=/export/dist # 设置Channel类型为Memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把 Source 和 Sink 绑到 Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

3、启动Flume代理

bin/flume-ng agent -n a1 -c conf -f ./job/job1.conf -Dflume.root.logger=INFO,console

4、测试 启动成功后,在source目录下,创建test.txt文件,然后会看到控制台打印如下内容: 在这里插入图片描述

这个时候,source目录下的文件变成了test.txt.COMPLETED,说明读取成功了。 在这里插入图片描述

4、Flume实现基于HDFS的收集

1、编写配置文件

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 Source a1.sources.r1.type =spooldir a1.sources.r1.spoolDir=/export/source # 配置 Sink a1.sinks.k1.type =hdfs #按照%Y-%m-%d/%H%M格式分开存储文件 a1.sinks.k1.hdfs.path=hdfs://node01:8020/flume/data/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240000 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=3 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1.hdfs.roundUnit=minute a1.sinks.k1.hdfs.useLocalTimeStamp=true #a1.sinks.k1.type =hdfs # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定 Source 和 Sink 到 Channel 上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

其中,a1.sinks.k1.hdfs.path的值,根据自己的配置填写。

2、启动Flume代理

bin/flume-ng agent -n a1 -c conf -f ./job/job2.conf -Dflume.root.logger=INFO,console

3、测试 启动成功后,在source目录下,创建t1.txt文件。然后访问hadoop的hdfs,比如:http://192.168.1.8:50070/explorer.html#/flume/data,就可以看见上传到hdfs的文件。 在这里插入图片描述

5、Flume实现通过exec命令收集数据 a1.sources = logSource a1.channels = fileChannel a1.sinks = hdfsSink #指定Source的类型是exec a1.sources.logSource.type = exec #指定命令是tial -F,持续监测/export/dist/test.txt中的数据 a1.sources.logSource.command = tail -F /export/dist/test.txt # 将Channel设置为fileChannel a1.sources.logSource.channels = fileChannel # 设置Sink为HDFS a1.sinks.hdfsSink.type = hdfs #文件生成的时间 a1.sinks.hdfsSink.hdfs.path = hdfs://master:8020/flume/record/%Y-%m-%d/%H%M a1.sinks.hdfsSink.hdfs.filePrefix= transaction_log a1.sinks.hdfsSink.hdfs.rollInterval= 600 a1.sinks.hdfsSink.hdfs.rollCount= 10000 a1.sinks.hdfsSink.hdfs.rollSize= 0 a1.sinks.hdfsSink.hdfs.round = true a1.sinks.hdfsSink.hdfs.roundValue = 10 a1.sinks.hdfsSink.hdfs.roundUnit = minute a1.sinks.hdfsSink.hdfs.fileType = DataStream a1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true #Specify the channel the sink should use a1.sinks.hdfsSink.channel = fileChannel # 设置 Channel的类型为file,并设置断点目录和channel数据存放目录 a1.channels.fileChannel.type = file a1.channels.fileChannel.checkpointDir= /export/flume/dataCheckpointDir a1.channels.fileChannel.dataDirs= /export/flume/dataDir

2、启动Flume代理

bin/flume-ng agent -n a1 -c conf -f ./job/job3.conf -Dflume.root.logger=INFO,console


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3