Flume+Kafka数据采集与清洗

您所在的位置:网站首页 java实现数据采集 Flume+Kafka数据采集与清洗

Flume+Kafka数据采集与清洗

2024-07-17 05:52| 来源: 网络整理| 查看: 265

项目说明

实现功能

模拟实时推荐系统中,数据实时采集与数据预处理,并用Kafka进行数据实时消费功能。

实现场景

用户对商品进行评分,后台实时对其进行获取与分析,并经过计算后,生成实时推荐结果。

项目架构图

流程说明

1、用户在浏览器点击商品对商品进行评分时,调用商品服务的接口。

2、评分接口将用户、商品、评分等信息通过logger输出到文件。

3、Flume监听log文件,将日志信息通过log主题发送到Kafka中。

4、清洗服务接收从log主题发送过来的消息通过关键字过滤出有效信息,并将有效信息通过recommender主题发送到Kafka。

5、推荐服务接收recommender主题的消息,并经过实时算法处理等一系列处理后推送给用户展现。

主要工具说明

Flume

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Zookeeper

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

早期版本的kafka用zookeeper做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zookeeper本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖。

版本说明

Kafka_2.12-2.8.0

编译Kafka源代码的Scala编译器版本号为2.12,Kafka的版本为2.8.0

Flume1.9.0

Flume1.8.0对kafka2.8.0有兼容性问题,因为在Flume1.8.0中,还是用的kafka0.9.0.1的版本包,会造成kafka2.8.0客户端消费消息时的时间戳问题。

工具安装 JDK安装

JDK下载链接

下载jdk-8u281-linux-x64.tar.gz压缩包,并解压缩到hadoop用户的家目录的jvm文件夹

cd ~ mkdir jvm tar -zxf jdk-8u281-linux-x64.tar.gz -C jvm

编辑环境变量:

vim ~/.bashrc

添加JAVA_HOME:

export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281 export PATH=$JAVA_HOME/bin

让环境变量生效:

source ~/.bashrc

查看java版本:

java -version

检验环境变量是否正确:

# 检验变量值 echo $JAVA_HOME java -version # 与直接执行 java -version 一样 $JAVA_HOME/bin/java -version Zookeeper安装

Zookeeper下载地址:http://mirrors.cnnic.cn/apache/zookeeper/stable/ 或 http://mirror.bit.edu.cn/apache/zookeeper/stable/。下载apache-zookeeper-3.6.3-bin.tar.gz。

解压文件:

tar -zxf apache-zookeeper-3.6.3-bin.tar.gz -C ./

重命名:

mv apache-zookeeper-3.6.3-bin zookeeper

进入zookeeper文件中创建文件夹:

cd zookeeper mkdir tmp

复制模板配置文件并修改:

cp ./conf/zoo-sample.cfg ./conf/zoo.cfg vim ./conf/zoo.cfg

将dataDir的路径修改为刚才创建的tmp目录路径:

启动zookeeper:

./bin/zkServer.sh start

显示Starting zookeeper … STARTED则表示启动成功。

如果需要停止zookeeper,可以通过stop命令停止:

./bin/zkServer.sh stop Flume-ng安装

通过wget下载flume安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

解压到家目录:

tar -zxf apache-flume-1.9.0-bin.tar.gz -C ~

重命名:

mv apache-flume-1.9.0-bin flume

在flume的conf目录下新建log-kafka.properties,内容为:

agent.sources = exectail agent.channels = memoryChannel agent.sinks = kafkasink # For each one of the sources, the type is defined agent.sources.exectail.type = exec # 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录 agent.sources.exectail.command = tail -f /home/hadoop/flume/log/agent.log agent.sources.exectail.interceptors=i1 agent.sources.exectail.interceptors.i1.type=regex_filter # 定义日志过滤前缀的正则 agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+ # The channel can be defined as follows. agent.sources.exectail.channels = memoryChannel # Each sink's type must be defined agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic = log agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092 agent.sinks.kafkasink.kafka.producer.acks = 1 agent.sinks.kafkasink.kafka.flumeBatchSize = 20 #Specify the channel the sink should use agent.sinks.kafkasink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 10000

上述配置文件功能描述:

使用tail -f /home/hadoop/flume/log/agent.log命令,监听文件中改动的内容,通过正则表达式.+PRODUCT_RATING_PREFIX.+匹配内容,将匹配的结果发送到localhost:9092中Kafka的log主题中。

对于以上配置参数,大致需要明白sources,channels,sinks部分。对于这三部分的关系,官方给出了一张图:

Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:

Source

采集组件,用于跟数据源对接,以获取数据。

Sink

下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据。

Channel

传输通道组件,用于从source将数据传递到sink。

进入flume目录,执行启动命令:

cd ~/flume ./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console Kafka安装

通过wget下载安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

解压到家目录:

tar -zxf kafka_2.12-2.8.0.tgz -C ~

重命名:

mv kafka_2.12-2.8.0.tgz kafka

进入kafka目录:

cd kafka

修改kafka配置:

vim config/server.properties listeners=PLAINTEXT://:9092 # 192.168.1.43为本机ip advertised.listeners=PLAINTEXT://192.168.1.43:9092 zookeeper.connect=localhost:2181

启动kafka(在zookeeper启动之后):

bin/kafka-server-start.sh -daemon ./config/server.properties

如果需要关闭kafka,执行:

bin/kafka-server-stop.sh

创建主题topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic recommender

在控制台发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommender

输入命令后,控制台会显示需要输入,此时输入的信息在回车之后会发送到kafka中去

ctrl+c退出。

在控制台输出消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommender

可以通过在两个终端中,一个打开发送消息,另一个打开接收消息。

服务搭建

maven项目结构:

BigData ├── BusinessServer #商品服务 ├── KafkaStreaming #清洗服务 └── StreamingRecommender #推荐服务 商品服务

BusinessServer(SpringBoot项目)

主要提供了一个restful接口,用于将关键信息打印至控制台,并配置日志输出至flume配置中指定的log文件。

评分接口:

package cn.javayuli.businessserver.web; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * 评分controller * * @author 韩桂林 */ @RestController public class RatingController { private static final Logger LOGGER = LoggerFactory.getLogger(RatingController.class); private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX"; /** * 用户对商品进行评分 * * @param user 用户 * @param product 商品 * @param score 分数 * @return */ @GetMapping("/rate") public String doRate(@RequestParam String user, @RequestParam String product, @RequestParam Double score) { LOGGER.info(PRODUCT_RATING_PREFIX + ":" + user +"|"+ product +"|"+ score +"|"+ System.currentTimeMillis()/1000); return "SUCCESS"; } }

在application.properties中配置启动端口与log4j文件输出路径:

server.port=7001 logging.file.name=/home/hadoop/flume/log/agent.log

将项目打成jar包,上传至服务器,使用java -jar ****.jar运行项目。

清洗服务

KafkaStreaming(非SpringBoot项目)

引入kafka-streams相关包:

org.apache.kafka kafka-streams 2.8.0 org.apache.kafka kafka-clients 2.8.0

创建一个Processor:

package cn.javayuli.kafkastream.processor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; /** * 日志预处理 * * @author hanguilin */ public class LogProcessor implements Processor { private ProcessorContext context; private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX:"; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(byte[] key, byte[] value) { String input = new String(value); // 根据前缀过滤日志信息,提取后面的内容 if(input.contains(PRODUCT_RATING_PREFIX)){ System.out.println("product rating coming!!!!" + input); input = input.split(PRODUCT_RATING_PREFIX)[1].trim(); context.forward("logProcessor".getBytes(), input.getBytes()); } } @Override public void close() { } }

创建main函数:

package cn.javayuli.kafkastream; import cn.javayuli.kafkastream.processor.LogProcessor; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; /** * @author hanguilin */ public class KafkaStreamApp { public static void main(String[] args) { // kafka地址 String brokers = "192.168.1.43:9092"; // 定义输入和输出的topic String from = "log"; String to = "recommender"; // 定义kafka streaming的配置 Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); // 拓扑建构器 StreamsBuilder builder = new StreamsBuilder(); Topology build = builder.build(); // 定义流处理的拓扑结构 build.addSource("SOURCE", from) .addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE") .addSink("SINK", to, "PROCESS"); KafkaStreams streams = new KafkaStreams(build, settings); streams.start(); } }

将项目打成jar包,上传至服务器,使用java -cp ****.jar cn.javayuli.kafkastream.KafkaStreamApp运行项目。

推荐服务

StreamingRecommender(非SpringBoot项目)

此处只对消息进行消费,不做推荐计算。

main函数:

package cn.javayuli.streamrecommender; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * @author hanguilin */ public class ConsumerApp { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.43:9092"); properties.put("group.id", "group-1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); kafkaConsumer.subscribe(Arrays.asList("recommender")); while (true) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }

将项目打成jar包,上传至服务器,使用java -cp ****.jar cn.javayuli.streamrecommender.ConsumerApp运行项目。

如果需要在非服务器上对程序进行远程测试,需要打开服务器的7001(BusinessServer)、9092(Kafka)端口,端口命令可以参考文章《CentOS7 中端口命令》。

数据模拟

下发一个评分请求:

首先商品服务会打印出日志:

查看/home/hadoop/flume/log/agent.log

可以看到,商品服务将日志追加到了/home/hadoop/flume/log/agent.log文件中。

此时,Flume监听到了文件内容发生改变,就会将追加的内容发送到Kafka的log主题中。

此时,清洗服务从log主题中获取到包含PRODUCT_RATING_PREFIX的日志信息,并将处理后的信息发送到recommender主题。

(下图中打印的是从log中取出来的数据,非处理后的数据)

由于推荐服务订阅了recommender主题,所以会对消息进行消费。

资源地址

文中只贴出了关键性代码,全部代码请查看git仓库Recommender。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3