Flume系列:Flume常用采集通道案例

您所在的位置:网站首页 flume的kafkachannel Flume系列:Flume常用采集通道案例

Flume系列:Flume常用采集通道案例

#Flume系列:Flume常用采集通道案例| 来源: 网络整理| 查看: 265

目录

Apache Hadoop生态-目录汇总-持续更新

1: 文件->flume->kafka

2: kafka->flume->hdfs (方案一)

3: kafka->flume->hdfs (方案二)

Apache Hadoop生态-目录汇总-持续更新

系统环境:centos7

Java环境:Java8

1: 文件->flume->kafka

tailDir source -> kafka channel

配置文件:file_flume_kafka.conf # 1:定义组件 file_flume_kafka.sources = r1 file_flume_kafka.channels = c1 # 2:定义source file_flume_kafka.sources.r1.type = TAILDIR file_flume_kafka.sources.r1.positionFile = /usr/local/flume-1.9.0/project_v4/tail_dir.json file_flume_kafka.sources.r1.fileSuffix = .COMPLETED file_flume_kafka.sources.r1.filegroups = f1 file_flume_kafka.sources.r1.filegroups.f1 = /log/app.*.log ## 定义source拦截器(ETL数据清洗,判断数据是否完整) file_flume_kafka.sources.r1.interceptors = i1 file_flume_kafka.sources.r1.interceptors.i1.type = com.wester.flume.interceptor.ETLInterceptor$Builder # 3:定义channel file_flume_kafka.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel file_flume_kafka.channels.c1.kafka.bootstrap.servers = 192.168.5.103:9092,192.168.5.87:9092,192.168.5.114:9092 file_flume_kafka.channels.c1.kafka.topic = project_v4_topic_log ## 设置消费者组,保证每次消费时能够获取上次对应的Offset file_flume_kafka.channels.c1.consumer.group.id = file_flume_kafka #设置不用flume组装的header file_flume_kafka.channels.c1.parseAsFlumeEvent = false # 4:定义sink - 这里不用 # 5:定义关联关系 file_flume_kafka.sources.r1.channels = c1

启动

nohup /usr/local/flume-1.9.0/bin/flume-ng agent \ --conf-file /shucang_v4/project_files/flume_jobs/conf/file_flume_kafka.conf \ -C /shucang_v4/project_files/flume_jobs/jar/project_v4_flume.jar \ --name file_flume_kafka -Dflume.root.logger=INFO,LOGFILE >/usr/local/flume-1.9.0/logs/file_flume_kafka.log 2>&1 & 2: kafka->flume->hdfs (方案一)

kafka source -> file channel -> hdfs sink

缺点:相比方案二多一步source,性能差点

优点:有source可以多加一个拦截器,这个看需要吧

配置文件:kafka_flume_hdfs.conf # 1:定义组件 kafka_flume_hdfs.sources = r1 kafka_flume_hdfs.channels = c1 kafka_flume_hdfs.sinks = k1 # 2:定义source kafka_flume_hdfs.sources.r1.type = org.apache.flume.source.kafka.KafkaSource kafka_flume_hdfs.sources.r1.kafka.bootstrap.servers = 192.168.5.103:9092,192.168.5.87:9092,192.168.5.114:9092 kafka_flume_hdfs.sources.r1.kafka.topics = project_v4_topic_log kafka_flume_hdfs.sources.r1.batchSize = 5000 kafka_flume_hdfs.sources.r1.batchDurationMillis = 2000 #从头开始消费-非实时场景常使用 kafka_flume_hdfs.sources.r1.kafka.consumer.auto.offset.reset = earliest ## 配置时间连接器(解决零点漂移问题) kafka_flume_hdfs.sources.r1.interceptors = i1 kafka_flume_hdfs.sources.r1.interceptors.i1.type = com.wester.flume.interceptor.TimeStampInterceptor$Builder # 3:定义channel kafka_flume_hdfs.channels.c1.type = file #file channel数据存储的路径 kafka_flume_hdfs.channels.c1.dataDirs = /usr/local/flume-1.9.0/data/project_v4 #file checkpointDir路径 kafka_flume_hdfs.channels.c1.checkpointDir = /usr/local/flume-1.9.0/checkpointDir/project_v4 # 4:定义sink kafka_flume_hdfs.sinks.k1.type = hdfs kafka_flume_hdfs.sinks.k1.hdfs.path = hdfs://hadoop322ha/project/v4/log/topic_log/%Y-%m-%d # 上传文件的前缀 kafka_flume_hdfs.sinks.k1.hdfs.filePrefix = logs- # 设置是否需要滚动生成文件,比如1小时一个, 如果设置为true需要设置对应的,roundValue,roundUnit kafka_flume_hdfs.sinks.k1.hdfs.round = false ## 控制生成的小文件 # 控制多久滚动一次文件,防止凑不够rollSize卡住, 正常设置3600,这里测试为了快速写出 kafka_flume_hdfs.sinks.k1.hdfs.rollInterval = 20 # 控制文件多大后,滚动文件,128M滚动文件 kafka_flume_hdfs.sinks.k1.hdfs.rollSize = 134217728 # 多少个events滚动文件,一般不指定写0 kafka_flume_hdfs.sinks.k1.hdfs.rollCount = 0 ## 配置输出类型CompressedStream(压缩流),DataStream(原样输出),与压缩 # 压缩流 kafka_flume_hdfs.sinks.k1.hdfs.fileType = CompressedStream # 压缩类型 kafka_flume_hdfs.sinks.k1.hdfs.codeC = lzop # 5:定义关联关系 kafka_flume_hdfs.sources.r1.channels = c1 kafka_flume_hdfs.sinks.k1.channel = c1

启动

nohup /usr/local/flume-1.9.0/bin/flume-ng agent \ --conf-file /shucang_v4/project_files/flume_jobs/conf/kafka_flume_hdfs.conf \ -C /shucang_v4/project_files/flume_jobs/jar/project_v4_flume.jar \ --name kafka_flume_hdfs -Dflume.root.logger=INFO,LOGFILE >/usr/local/flume-1.9.0/logs/kafka_flume_hdfs.log 2>&1 & 3: kafka->flume->hdfs (方案二)

kafka channel -> hdfs sink 不用source

优点:相比方案一少一步source,性能高些

缺点:没有source就不能加拦截器,数据要再到kafka之前处理好

配置文件:kafka_channel_flume_hdfs.conf # 1:定义组件 kafka_channel_flume_hdfs.channels = c1 kafka_channel_flume_hdfs.sinks = k1 # 2:定义source - 这里不用 # 3:定义channel kafka_channel_flume_hdfs.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel kafka_channel_flume_hdfs.channels.c1.kafka.bootstrap.servers = 192.168.5.103:9092,192.168.5.87:9092,192.168.5.114:9092 kafka_channel_flume_hdfs.channels.c1.kafka.topic = project_v4_topic_log_v2 ## 设置消费者组,保证每次消费时能够获取上次对应的Offset kafka_channel_flume_hdfs.channels.c1.consumer.group.id = kafka_channel_flume_hdfs_v2 #设置不用flume组装的header #kafka_channel_flume_hdfs.channels.c1.parseAsFlumeEvent = false # 4:定义sink kafka_channel_flume_hdfs.sinks.k1.type = hdfs kafka_channel_flume_hdfs.sinks.k1.hdfs.path = hdfs://hadoop322ha/project/v4/log/topic_log_v2/%Y-%m-%d # 上传文件的前缀 kafka_channel_flume_hdfs.sinks.k1.hdfs.filePrefix = logs- # 设置是否需要滚动生成文件,比如1小时一个, 如果设置为true需要设置对应的,roundValue,roundUnit kafka_channel_flume_hdfs.sinks.k1.hdfs.round = false ## 控制生成的小文件 # 控制多久滚动一次文件,防止凑不够rollSize卡住, 正常设置3600,这里测试为了快速写出 kafka_channel_flume_hdfs.sinks.k1.hdfs.rollInterval = 20 # 控制文件多大后,滚动文件,128M滚动文件 kafka_channel_flume_hdfs.sinks.k1.hdfs.rollSize = 134217728 # 多少个events滚动文件,一般不指定写0 kafka_channel_flume_hdfs.sinks.k1.hdfs.rollCount = 0 ## 配置输出类型CompressedStream(压缩流),DataStream(原样输出),与压缩 # 压缩流 kafka_channel_flume_hdfs.sinks.k1.hdfs.fileType = CompressedStream # 压缩类型 kafka_channel_flume_hdfs.sinks.k1.hdfs.codeC = lzop # 5:定义关联关系 kafka_channel_flume_hdfs.sinks.k1.channel = c1

启动

nohup /usr/local/flume-1.9.0/bin/flume-ng agent \ --conf-file /shucang_v4/project_files/flume_jobs/conf2/kafka_channel_flume_hdfs.conf \ -C /shucang_v4/project_files/flume_jobs/jar/project_v4_flume.jar \ --name kafka_channel_flume_hdfs -Dflume.root.logger=INFO,LOGFILE >/usr/local/flume-1.9.0/logs/kafka_channel_flume_hdfs.log 2>&1  Flume系列

        Apache Hadoop生态部署-Flume采集节点安装

        Flume系列:Flume组件架构

        Flume系列:Flume Source使用

        Flume系列:Flume Channel使用

        Flume系列:Flume Sink使用

        Flume系列:Flume 自定义Interceptor拦截器

        Flume系列:Flume通道拓扑结构

        Flume系列:Flume常用采集通道案例

        Flume系列:案例-Flume复制(Replicating)和多路复用(Multiplexing)

        Flume系列:案例-Flume负载均衡和故障转移

        Flume系列:案例-Flume 聚合拓扑(常见的日志收集结构)

        Flume系列:Flume数据监控Ganglia



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3