5.1.1 电商离线数仓(数仓需求分析、日志采集、表数据加载、json数据处理、 Datax 数据导出、Tez高仿日启动测试)

您所在的位置:网站首页 电商数据报表格式 5.1.1 电商离线数仓(数仓需求分析、日志采集、表数据加载、json数据处理、 Datax 数据导出、Tez高仿日启动测试)

5.1.1 电商离线数仓(数仓需求分析、日志采集、表数据加载、json数据处理、 Datax 数据导出、Tez高仿日启动测试)

2024-03-16 08:53| 来源: 网络整理| 查看: 265

电商离线数仓

文章目录 电商离线数仓一、电商离线数仓设计第1节 需求分析电商业务简介 第2节 数据埋点第3节 数据指标体系第4节 总体架构设计4.1、技术方案选型框架选型**软件选型**服务器选型集群规模规划 4.2、系统逻辑架构4.3、开发物理环境4.4、数据仓库命名规范 二、 电商分析之--会员活跃度第1节 需求分析计算指标:指标口径业务逻辑: 第2节 日志数据采集2.1、taildir source配置2.2、hdfs sink配置2.3、Agent的配置2.4、Flume的优化配置2.5、自定义拦截器2.6、采集启动日志(使用自定义拦截器)2.7、采集启动日志和事件日志2.8 日志数据采集小结 第3节 ODS建表和数据加载第4节 json数据处理4.1、使用内建函数处理4.2、使用UDF处理4.3、使用SerDe处理各种Json格式处理方法小结: 第5节 DWD层建表和数据加载5.1、创建DWD层表5.2、加载DWD层数据 第6节 活跃会员6.1、创建DWS层表6.2、加载DWS层数据6.3、创建ADS层表6.4、加载ADS层数据6.5、小结 第7节 新增会员7.1、创建DWS层表7.2、加载DWS层数据7.3、创建ADS层表7.4、加载ADS层数据7.5、小结 第8节 留存会员8.1、创建DWS层表8.2、加载DWS层数据8.3、创建ADS层表8.4、加载ADS层数据8.5、小结 第9节 Datax 数据导出第10节 高仿日启动数据测试10.1、Hive on MR测试10.2、Tez简介及安装10.3、Hive on Tez测试10.4、会员留存率的计算

一、电商离线数仓设计 第1节 需求分析

近年来,中国的电子商务快速发展,交易额连创新高,电子商务在各领域的应用不断拓展和深化、相关服务业蓬勃发展、支撑体系不断健全完善、创新的动力和能力 不断增强。电子商务正在与实体经济深度融合,进入规模性发展阶段,对经济社会生活 的影响不断增大,正成为我国经济发展的新引擎。

中国电子商务研究中心数据显示,截止到 2012 年底,中国电子商务市场交易规模达7.85万亿人民币,同比增长 30.83%。其中,B2B 电子商务交易额 达 6.25 万亿,同比增长 27%。而 2011 年全年,中国电子商务市场交易额达 6 万亿人民币,同比增长 33%,占 GDP 比重上升到 13%;2012 年,电子商务占 GDP 的比重已经高达15%。

电商行业技术特点

技术新技术范围广分布式高并发、集群、负载均衡海量数据业务复杂系统安全 电商业务简介

类似X东商城、X猫商城。电商网站采用商家入驻的模式,商家入驻平台提交申请,有平台进行资质审核,审核通过后,商家拥有独立的管理后台录入商品信息。商品经过平台审核后即可发布。网上商城主要分为:

网站前台。网站首页、商家首页、商品详细页、搜索页、会员中心、订单与支付相关页面、秒杀频道等;运营商后台。运营人员的管理平台, 主要功能包括:商家审核、品牌管理、规格管理、模板管理、商品分类管理、商品审核、广告类型管理、广告管理、订单查询、商家结算等;商家管理后台。入驻的商家进行管理的平台,主要功能包括:商品管理、订单查询统计、资金结算等功能;

数据仓库项目主要分析以下数据:

日志数据:启动日志、点击日志(广告点击日志)业务数据库的交易数据:用户下单、提交订单、支付、退款等核心交易数据的分析

数据仓库项目分析任务:

会员活跃度分析主题 每日新增会员数;每日、周、月活跃会员数;留存会员数、留存会员率广告业务分析主题 广告点击次数、广告点击购买率、广告曝光次数核心交易分析主题 订单数、成交商品数、支付金额 第2节 数据埋点

数据埋点,将用户的浏览、点击事件采集上报的一套数据采集的方法。 通过这套方法,能够记录到用户在App、网页的一些行为,用来跟踪应用使用的状况,后续用来进一步优化产品或是提供运营的数据支撑,包括访问数、访客数、停留时长、浏览数、跳出率。这样的信息收集可以大致分为两种:页面统计、统计操作行为。

在企业经营中,数据分析辅助决策是非常重要的一环,而埋点采集用户行为数据的工作则是基础中的基础。如果没有用户行为数据,经营分析将无从说起。埋点为数据分析提供基础数据,埋点工作流程可分为:

根据埋点需求完成开发(前端开发工程师 js)App或网页采用用户数据数据上报服务器数据的清洗、加工、存储(大数据工程师)进行数据分析等到相应的指标(大数据工程师)

在以上过程中,涉及的相关人员可分以下几类:

埋点需求:数据产品经理,负责撰写需求文档,规定哪些区域、用户操作需要埋点埋点采集:前端工程师,负责通过一套前端 js 代码对用户的请求事件上送至服务器数据清洗、加工及存储:对埋点中数据缺失、误报等情况需要进行清洗,并通过一定的计算加工,输出业务分析所需要的结构化数据,最后将数据存储在数据仓库中数据分析:在数据仓库中对数据进行整理,成业务关注的指标前端展示:Java 开发

主流的埋点实现方法如下,主要区别是前端开发的工作量:

手动埋点:开发需要手动写代码实现埋点,比如页面ID、区域ID、按钮ID、按钮位置、事件类型(曝光、浏览、点击)等,一般需要公司自主研发的一套埋点框架

优点:埋点数据更加精准缺点:工作量大,容易出错

无痕埋点:不用开发写代码实现的,自动将设备号、浏览器型号、设备类型等数据采集。主要使用第三方统计工具,如友盟、百度移动、魔方等

优点:简单便捷缺点:埋点数据统一,不够个性化和精准

启动日志: 在/data/lagoudw/logs/start 目录下放入启动日志文件,格式如下:

{ "app_active":{ "name":"app_active", "json":{ "entry":"3", "action":"0", "error_code":"0" }, "time":1593553936325 }, "attr":{ "area":"葫芦岛", "uid":"2F10092A192", "app_v":"1.1.12", "event_type":"common", "device_id":"1FB872-9A100192", "os_type":"0.7.0", "channel":"MA", "language":"chinese", "brand":"Huawei-4" } }

事件日志(广告点击、收藏、点赞、消息通知、商品评论、商品详情页加载等事件): 放在 /data/lagoudw/logs/event 目录下

{ "lagou_event":[ { "name":"goods_detail_loading", "json":{ "entry":"2", "goodsid":"0", "loading_time":"71", "action":"3", "staytime":"119", "showtype":"5" }, "time":1594804466872 }, { "name":"notification", "json":{ "action":"3", "type":"4" }, "time":1594775458428 }, { "name":"ad", "json":{ "duration":"19", "ad_action":"0", "shop_id":"46", "event_type":"ad", "ad_type":"2", "show_style":"1", "product_id":"9022", "place":"placeindex_right", "sort":"4" }, "time":1594779518872 }, { "name":"favorites", "json":{ "course_id":2, "id":0, "userid":0 }, "time":1594812897271 } ], "attr":{ "area":"清远", "uid":"2F10092A77", "app_v":"1.1.7", "event_type":"common", "device_id":"1FB872-9A10077", "os_type":"0.8.4", "channel":"PQ", "language":"chinese", "brand":"iphone-2" } } 第3节 数据指标体系

指标:对数据的统计值。如:会员数、活跃会员数、会员留存数;广告点击量;订单金额、订单数都是指标; 指标体系:将各种指标系统的组织起来,按照业务模型、标准对指标进行分类和分层;

没有数据指标体系的团队内数据需求经常表现为需求膨胀以及非常多的需求变更。每个人都有看数据的视角和诉求,然后以非专业的方式创造维度/指标的数据口径。数据分析人员被海量的数据需求缠住,很难抽离出业务规则设计好的解决方案,最终滚雪球似的搭建难以维护的数据仓库。

建立指标体系实际上是与需求方达成一致。能有效遏制不靠谱的需求,让需求变得有条例和体系化;指标体系是知道数据仓库建设的基石。稳定而且体系化的需求,有利于数据仓库方案的优化,和效率提升;

由产品经理牵头、与业务、IT方协助,制定的一套能从维度反应业务状况的一套待实施框架。在建立指标体系时,要注重三个选取原则:准确、可解释、结构性。

准确:核心数据一定要理解到位和准确,不能选错;可解释:所有指标都要配上明确、详细的业务解释。如日活的定义是什么,是使用了App、还是在App中停留了一段时间、或是收藏或购买购买了商品;结构性:能够充分对业务进行解读。如新增用户只是一个大数,还需要知道每个渠道的新增用户,每个渠道的新增转化率,每个渠道的新增用户价值等。

在建立指标体系之前,先了解一下指标的构成,在工作过程中遇见的指标多为派生性指标。指标的构成如下所示:

基础指标 + [ 修饰词 ] + 时间段修饰词是可选的;基础指标和时间段是必须的基础指标是不可拆分的指标,如:交易额、支付金额、下单数修饰词多是某种场景的表现,如:通过搜索带来的交易等时间段即为一个时间周期,如:双十一期间,618活动期间等

三者叠加在一起就形成业务上常用的指标(这些指标也是派生指标),如:双11这一天通过搜索带来的交易额、双11这一天的交易额。同样,像此类日活、月活、次日留存、日转化率等都属于派生指标。

在筛选完合理指标后,就要着手建立对应的指标体系。主要分为四个步骤:理清业务阶段和需求、确定核心指标、对指标进行维度的拆解、指标的落地;

1、厘清业务阶段及需求 企业的发展往往分为三个阶段:创业期、上升期、成熟发展期,不同的阶段关注的核心指标也是不同的。

业务前期,最关注用户量,此时的指标体系应该紧密围绕用户量的提升来做各种维度的拆解业务中期,除了关注用户量的走势大小,更加重要的是优化当前的用户量结构,比如看用户留存,如果留存偏低,那就需要进一步分析查找原因成熟发展期,更多关注的就是产品变现能力和市场份额,要关注收入指标、各种商业化模式的的收入,同时做好市场份额和竞品的监控,以防止新起势力抢占份额等

2、确定核心指标 这个阶段最重要的是找到正确的核心指标。

例:某款产品的日活口径是打开APP,而且日活量不小,而且稳定上升。然而分析时 发现,打开APP的用户中,5秒跳出率高达25%,这是非常不健康的,那么当前的核 心指标日活实际上已经有了问题,更加好的核心指标应该是停留时长大于5秒的用户 数。

每个APP的核心指标都不太一样,一定要花时间去考虑这件事。就像XX头条APP,它 的日活和留存指标一定非常高,但仅关注这种指标肯定是不对的,它的真正核心指标 绝对不是单纯的日活和留存。

3、核心指标维度拆解 核心指标的波动必然是某种维度的波动引起,要监控核心指标,本质上还是要监控维 度核心指标。

在分析“进入APP用户数”指标时,要关注渠道转化率,分析用户从哪里来;同时用户 通过哪种方式打开的,如通过点击桌面图标、点击通知栏、点击Push等;

在分析“停留时长大于5秒占比”指标时,要重点关注停留时长的分布,停留1秒 – 5秒 的用户各有多少,具体分布情况;停留大于5秒的用户特征和行为特性是怎么样的情 况;停留小于5秒的用户特征等;

电商平台注重交易额,在真正达成交易之前,用户要打开APP、选择商品、确认订 单、支付订单等整个交流漏斗模型。每一个环节的关键指标都可以通过公式的形式进 行拆解,在根据拆解公式逐个分析对应的影响因素。

4、指标宣贯、存档、落地 在完成整个指标体系搭建后,要告知所有相关业务人员。一方面为下一步工作做铺 垫,另一方面是为了让所有相关人员知晓已完成,以防甩锅;

对指标口径的业务逻辑进行详细的描述并存档,只有明确、清晰的定义才能明白指标 的具体含义;

就是建立核心指标的相关报表,实际工作中,报表会在埋点前建好的,这样的话一旦 版本上线就能立刻看到数据,而且也比较容易发现问题。

整个指标体系的搭建主要是由产品经理主导完成的,业务人员需要配合产品经理选择 并确认指标,这也是在建立之初最重要的一点。

第4节 总体架构设计 4.1、技术方案选型

框架选型 软件选型 服务器选型 集群规模的估算

框架选型

Apache / 第三方发行版(CDH / HDP / Fusion Insight) Apache社区版本 优点:

完全开源免费社区活跃文档、资料详实 缺点:复杂的版本管理复杂的集群安装复杂的集群运维复杂的生态环境

第三方发行版本(CDH / HDP / Fusion Insight) Hadoop遵从Apache开源协议,用户可以免费地任意使用和修改Hadoop。正因如 此,市面上有很多厂家在Apache Hadoop的基础上开发自己的产品。如Cloudera的 CDH,Hortonworks的HDP,华为的Fusion Insight等。这些产品的优点是:

主要功能与社区版一致版本管理清晰。比如Cloudera,CDH1,CDH2,CDH3,CDH4等,后面加上补 丁版本,如CDH4.1.0 patch level 923.142比 Apache Hadoop 在兼容性、安全性、稳定性上有增强。第三方发行版通常都 经过了大量的测试验证,有众多部署实例,大量的运用到各种生产环境版本更新快。如CDH每个季度会有一个update,每一年会有一个release基于稳定版本Apache Hadoop,并应用了最新Bug修复或Feature的patch提供了部署、安装、配置工具,大大提高了集群部署的效率,可以在几个小时内 部署好集群运维简单。提供了管理、监控、诊断、配置修改的工具,管理配置方便,定位问 题快速、准确,使运维工作简单,有效

CDH:最成型的发行版本,拥有最多的部署案例。提供强大的部署、管理和监控工 具。国内使用最多的版本;拥有强大的社区支持,当遇到问题时,能够通过社区、论 坛等网络资源快速获取解决方法;

HDP:100%开源,可以进行二次开发,但没有CDH稳定。国内使用相对较少;

Fusion Insight:华为基于hadoop2.7.2版开发的,坚持分层,解耦,开放的原则, 得益于高可靠性,在全国各地政府、运营商、金融系统有较多案例。

软件选型

数据采集:DataX、Flume、Sqoop、Logstash、Kafka 数据存储:HDFS、HBase 数据计算:Hive、MapReduce、Tez、Spark、Flink 调度系统:Airflow、azkaban、Oozie 元数据管理:Atlas 数据质量管理:Griffin 即席查询:Impala、Kylin、ClickHouse、Presto、Druid 其他:MySQL

框架、软件尽量不要选择最新的版本,选择半年前左右稳定的版本。 在这里插入图片描述

服务器选型

选择物理机还是云主机 机器成本考虑:物理机的价格 > 云主机的价格 运维成本考虑:物理机需要有专业的运维人员;云主机的运维工作由供应商完成,运 维相对容易,成本相对较低;

集群规模规划

如何确认集群规模(假设:每台服务器20T硬盘,128G内存) 可以从计算能力(CPU、 内存)、存储量等方面着手考虑集群规模。 假设: 1、每天的日活用户500万,平均每人每天有100条日志信息 2、每条日志大小1K左右 3、不考虑历史数据,半年集群不扩容 4、数据3个副本 5、离线数据仓库应用

需要多大集群规模? 要分析的数据有两部分:日志数据+业务数据 每天日志数据量:500W * 100 * 1K / 1024 / 1024 = 500G 半年需要的存储量:500G * 3 * 180 / 1024 = 260T 通常要给磁盘预留20-30%的空间(这里取25%): 260 * 1.25 = 325T 数据仓库应用有1-2倍的数据膨胀(这里取1.5):500T 需要大约25个节点

其他未考虑因素:数据压缩、业务数据 以上估算的生产环境。实际上除了生产环境以外,还需要开发测试环境,这也需要一 定数据的机器。

4.2、系统逻辑架构

在这里插入图片描述

4.3、开发物理环境

5台物理机;500G数据盘;32G内存;8个core

在这里插入图片描述

在这里插入图片描述 这里根据本地实际情况,去掉了Hadoop3、Hadoop4两台

关于数据集的说明: 1、在开发过程中使用小规模数据集 2、模块测试使用真实的数据集(数据量大) 3、在做项目期间根据自己实际情况使用不同的数据量(建议使用小规模的数据集)

4.4、数据仓库命名规范 1 数据库命名 命名规则:数仓对应分层 命名示例:ods / dwd / dws/ dim / temp / ads 2 数仓各层对应数据库 ods层 -> ods_{业务线|业务项目} dw层 -> dwd_{业务线|业务项目} + dws_{业务线|业务项目} dim层 -> dim_维表 ads层 -> ads_{业务线|业务项目} (统计指标等) 临时数据 -> temp_{业务线|业务项目} 备注:本项目未采用 3 表命名(数据库表命名规则) \* ODS层: 命名规则:ods_{业务线|业务项目}_[数据来源类型]_{业务} \* DWD层: 命名规则:dwd_{业务线|业务项目}_{主题域}_{子业务} \* DWS层: 命名规则:dws_{业务线|业务项目}_{主题域}_{汇总相关粒度}_{汇总时间周 期} \* ADS层: 命名规则:ads_{业务线|业务项目}_{统计业务}_{报表form|热门排序topN} \* DIM层: 命名规则:dim_{业务线|业务项目|pub公共}_{维度}

创建数据库: hive里

create database if not exists ods; create database if not exists dwd; create database if not exists dws; create database if not exists ads; create database if not exists dim; create database if not exists tmp; 二、 电商分析之–会员活跃度 第1节 需求分析

会员数据是后期营销的很重要的数据。网店会专门针对会员进行一系列营销活动。 电商会员一般门槛较低,注册网站即可加入。有些电商平台的高级会员具有时效性, 需要购买VIP会员卡或一年内消费额达到多少才能成为高级会员。

计算指标:

新增会员:每日新增会员数 活跃会员:每日,每周,每月的活跃会员数 会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率

指标口径业务逻辑:

会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据 IMEI号,IOS系统通常根据OpenUDID 来标识一个独立会员,每部移动设备是一个会 员; 活跃会员:打开应用的会员即为活跃会员,暂不考虑用户的实际使用情况。一台设备 每天多次打开计算为一个活跃会员。在自然周内启动过应用的会员为周活跃会员,同 理还有月活跃会员; 会员活跃率:一天内活跃会员数与总会员数的比率是日活跃率;还有周活跃率(自然 周)、月活跃率(自然月); 新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被 算作一次新增。新增用户包括日新增会员、周(自然周)新增会员、月(自然月)新 增会员; 留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是 留存会员;这部分会员占当时新增会员的比例为留存率。

已知条件: 1、明确了需求 2、输入:启动日志(OK)、事件日志 3、输出:新增会员、活跃会员、留存会员 4、日志文件、ODS、DWD、DWS、ADS(输出) 下一步作什么? 数据采集:日志文件 => Flume => HDFS => ODS

第2节 日志数据采集

原始日志数据(一条启动日志)

/data/lagoudw/logs/start/start1.log

内容如下

2020-07-30 14:18:47.339 [main] INFO com.lagou.ecommerce.AppStart - {"app_active": {"name":"app_active","json": {"entry":"1","action":"1","error_code":"0"},"time":159611188852 9},"attr":{"area":"泰 安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","d evice_id":"1FB872- 9A1009","os_type":"4.7.3","channel":"DK","language":"chinese"," brand":"iphone-9"}}

在这里插入图片描述 选择Flume作为采集日志数据的工具:

Flume 1.6 无论是Spooling Directory Source、Exec Source均不能很好的满足动态实时收集的需求 Flume 1.8+ 提供了一个非常好用的 Taildir Source使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集 2.1、taildir source配置

taildir Source的特点:

使用正则表达式匹配目录中的文件名监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink高可靠,不会丢失数据不会对跟踪文件有任何处理,不会重命名也不会删除不支持Windows,不能读二进制文件。支持按行读取文本文件

taildir source配置

a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log positionFile 配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置, 解决断点续传的问题filegroups 指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录 中的文件)filegroups. 配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配 2.2、hdfs sink配置 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 100 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true

HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:

基于时间。hdfs.rollInterval 30秒基于文件大小。hdfs.rollSize 1024字节基于event数量。hdfs.rollCount 10个event基于文件空闲时间。hdfs.idleTimeout 0 0,禁用minBlockReplicas。默认值与 hdfs 副本数一致。设为1是为了让 Flume 感知不 到hdfs的块复制,此时其他的滚动方式配置(时间间隔、文件大小、events数 量)才不会受影响 2.3、Agent的配置

/data/lagoudw/conf/flume-log2hdfs1.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2.4、Flume的优化配置

1、启动agent

[root@linux122 ~]flume-ng agent --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console

2、向 /data/lagoudw/logs/ 目录中放入日志文件,报错: java.lang.OutOfMemoryError: GC overhead limit exceeded 在这里插入图片描述 缺省情况下 Flume jvm堆最大分配20m,这个值太小,需要调整。

3、解决方案:在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容

export JAVA_OPTS="-Xms4000m -Xmx4000m - Dcom.sun.management.jmxremote" # 要想使配置文件生效,还要在命令行中指定配置文件目录 flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console

Flume内存参数设置及优化:

根据日志数据量的大小,Jvm堆一般要设置为4G或更高-Xms -Xmx 最好设置一致,减少内存抖动带来的性能影响

存在的问题:Flume放数据时,使用本地时间;不理会日志的时间戳

2.5、自定义拦截器

前面 Flume Agent 的配置使用了本地时间,可能导致数据存放的路径不正确。 要解决以上问题需要使用自定义拦截器。 agent用于测试自定义拦截器。netcat source =>logger sink /data/lagoudw/conf/flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop2 a1.sources.r1.port = 9999 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type =cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source、channel、sink之间的关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

自定义拦截器的原理: 1、自定义拦截器要集成Flume 的 Interceptor 2、Event 分为header 和 body(接收的字符串) 3、获取header和body 4、从body中获取"time":1596382570539,并将时间戳转换为字符串 “yyyy-MM-dd” 5、将转换后的字符串放置header中

自定义拦截器的实现: 1、获取 event 的 header 2、获取 event 的 body 3、解析body获取json串 4、解析json串获取时间戳 5、将时间戳转换为字符串 “yyyy-MM-dd” 6、将转换后的字符串放置header中 7、返回event

创建MAVEN项目 pom.xml < scope>provided< /scope> 这个属性代表打包时无须打入这个

4.0.0 org.example cn.lagou.dw 1.0-SNAPSHOT org.apache.flume flume-ng-core 1.9.0 provided com.alibaba fastjson 1.1.23 junit junit 4.12 provided UTF-8 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single

CustomerInterceptor.java

package cn.lagou.dw.flume.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.compress.utils.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.SimpleEvent; import org.apache.flume.interceptor.Interceptor; import org.junit.Test; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class CustomerInterceptor implements Interceptor { @Override public void initialize() { } @Override // 逐条处理event public Event intercept(Event event) { // 获取 event 的 body String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取 event 的 header Map headersMap = event.getHeaders(); // 解析body获取json串 String[] bodyArr = eventBody.split("\\s+"); try { String jsonStr = bodyArr[6]; // 解析json串获取时间戳 JSONObject jsonObject = JSON.parseObject(jsonStr); String timestampStr = jsonObject.getJSONObject("app_active").getString("time"); // 将时间戳转换为字符串 "yyyy-MM-dd" // 将字符串转换为Long long timestamp = Long.parseLong(timestampStr); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); Instant instant = Instant.ofEpochMilli(timestamp); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String date = formatter.format(localDateTime); // 将转换后的字符串放置header中 headersMap.put("logtime", date); event.setHeaders(headersMap); } catch (Exception e) { headersMap.put("logtime", "Unknown"); event.setHeaders(headersMap); } return event; } @Override public List intercept(List events) { List lstEvent = new ArrayList(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { lstEvent.add(outEvent); } } return lstEvent; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } } @Test public void testJunit() { String str = "2020-08-20 12:00:58.395 [main] INFO com.lagou.ecommerce.AppEvent - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"大庆\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}"; Map map = new HashMap(); // new Event Event event = new SimpleEvent(); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); // 调用interceptor处理event CustomerInterceptor customerInterceptor = new CustomerInterceptor(); Event outEvent = customerInterceptor.intercept(event); // 处理结果 Map headersMap = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headersMap)); } }

执行测试类: 在这里插入图片描述

将程序打包,放在 flume/lib目录下; 在这里插入图片描述 将打包文件cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar 拷贝到/data/lagoudw/jars/目录下 建立软连接:

[root@linux122 ~]# ln -s /data/lagoudw/jars/cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/lagou/servers/flume-1.9.0/lib/cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar

启动Agent测试

flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf --conf-file /data/lagoudw/conf/flumetest1.conf -name a1 -Dflume.roog.logger=INFO,console

在这里插入图片描述 在这里插入图片描述

2.6、采集启动日志(使用自定义拦截器)

1、定义配置文件 /data/lagoudw/conf/flume-log2hdfs2.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile =/data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type =cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/ a1.sinks.k1.hdfs.filePrefix = startlog. a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 # a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

修改:

给source增加自定义拦截器去掉本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true根据header中的logtime写文件

2、启动服务

# 测试 flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs2.conf -name a1 -Dflume.root.logger=INFO,console

3、拷贝日志

[root@linux122 ~]# cp /data/start1.log /data/lagoudw/logs/start/start1.log

在这里插入图片描述

4、检查HDFS文件

[root@linux122 ~]# hdfs dfs -ls /user/data/logs/start

在这里插入图片描述

2.7、采集启动日志和事件日志

本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。 要想一次拿到全部日志需要监控多个目录。 在这里插入图片描述 总体思路 1、taildir监控多个目录 2、修改自定义拦截器,不同来源的数据加上不同标志 3、hdfs sink 根据标志写文件

Agent配置 /data/lagoudw/conf/flume-log2hdfs3.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log a1.sources.r1.headers.f1.logtype = start a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log a1.sources.r1.headers.f2.logtype = event # 自定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type =cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/ a1.sinks.k1.hdfs.filePrefix = startlog. a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 filegroups 指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)headers.< filegroupName>.< headerKey> 给 event 增加header key。不同的filegroup,可配置不同的value

自定义拦截器 编码完成后打包上传服务器,放置在$FLUME_HOME/lib 下

package cn.lagou.dw.flume.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.commons.compress.utils.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.SimpleEvent; import org.apache.flume.interceptor.Interceptor; import org.junit.Test; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override // 逐条处理event public Event intercept(Event event) { // 获取 event 的 body String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取 event 的 header Map headersMap = event.getHeaders(); // 解析body获取json串 String[] bodyArr = eventBody.split("\\s+"); try { String jsonStr = bodyArr[6]; // 解析json串获取时间戳 String timestampStr = ""; JSONObject jsonObject = JSON.parseObject(jsonStr); if (headersMap.getOrDefault("logtype", "").equals("start")) { // 取启动日志的时间戳 timestampStr = jsonObject.getJSONObject("app_active").getString("time"); } else if (headersMap.getOrDefault("logtype", "").equals("event")) { // 取事件日志第一条记录的时间戳 JSONArray jsonArray = jsonObject.getJSONArray("lagou_event"); if (jsonArray.size() > 0) { timestampStr = jsonArray.getJSONObject(0).getString("time"); } } // 将时间戳转换为字符串 "yyyy-MM-dd" // 将字符串转换为Long long timestamp = Long.parseLong(timestampStr); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); Instant instant = Instant.ofEpochMilli(timestamp); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String date = formatter.format(localDateTime); // 将转换后的字符串放置header中 headersMap.put("logtime", date); event.setHeaders(headersMap); } catch (Exception e) { headersMap.put("logtime", "Unknown"); event.setHeaders(headersMap); } return event; } @Override public List intercept(List events) { List lstEvent = new ArrayList(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { lstEvent.add(outEvent); } } return lstEvent; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } @Test public void startJunit() { String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"大庆\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}"; Map map = new HashMap(); // new Event Event event = new SimpleEvent(); map.put("logtype", "start"); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); // 调用interceptor处理event LogTypeInterceptor customerInterceptor = new LogTypeInterceptor(); Event outEvent = customerInterceptor.intercept(event); // 处理结果 Map headersMap = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headersMap)); } @Test public void eventJunit() { String str = "2020-08-02 18:20:11.877 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":1596343881690},{\"name\":\"loading\",\"json\":{\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",\"type\":\"1\"},\"time\":1596356988428},{\"name\":\"notification\",\"json\":{\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},{\"name\":\"favorites\",\"json\":{\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}],\"attr\":{\"area\":\"长治\",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\":\"chinese\",\"brand\":\"xiaomi-0\"}}"; Map map = new HashMap(); // new Event Event event = new SimpleEvent(); map.put("logtype", "event"); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); // 调用interceptor处理event LogTypeInterceptor customerInterceptor = new LogTypeInterceptor(); Event outEvent = customerInterceptor.intercept(event); // 处理结果 Map headersMap = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headersMap)); } }

执行测试方法: 在这里插入图片描述 在这里插入图片描述

测试 启动Agent,拷贝日志,检查HDFS文件

## 清理环境 #本地 rm -f /data/lagoudw/conf/startlog_position.json rm -f /data/lagoudw/logs/start/*.log rm -f /data/lagoudw/logs/event/*.log #hdfs上 hdfs dfs -rm -f -r /user/data/logs/start/* hdfs dfs -rm -f -r /user/data/logs/event/* # 启动 Agent flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,console # 拷贝日志 [root@linux122 ~]# cp /data/lagoudw/logs/data/start1.log /data/lagoudw/logs/start/ [root@linux122 ~]# cp /data/lagoudw/logs/data/event1.log /data/lagoudw/logs/event/ # 检查HDFS文件 hdfs dfs -ls /user/data/logs/event hdfs dfs -ls /user/data/logs/start # 生产环境中用以下方式启动Agent nohup flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &

在这里插入图片描述 在这里插入图片描述

nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程/dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞标准输入0,从键盘获得输入 /proc/self/fd/0标准输出1,输出到屏幕(控制台) /proc/self/fd/1错误输出2,输出到屏幕(控制台) /proc/self/fd/2

/dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容

2>&1 错误输出将会和标准输出输出到同一个地方

/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中

2.8 日志数据采集小结 使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同header在每个目录中可以使用正则匹配多个文件使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中hdfs sink使用event header中的信息写数据(控制写文件的位置)hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)调节flume jvm内存的分配 第3节 ODS建表和数据加载

在这里插入图片描述 ODS层的数据与源数据的格式基本相同。 hive里创建: 创建ODS层表:

use ODS; create external table ods.ods_start_log( `str` string) comment '用户启动日志信息' partitioned by (`dt` string) location '/user/data/logs/start'; -- 加载数据的功能(测试时使用) alter table ods.ods_start_log add partition(dt='2020-08-02'); alter table ods.ods_start_log drop partition (dt='2020-08-02');

在这里插入图片描述 查询数据: 在这里插入图片描述

加载启动日志数据: /data/lagoudw/script/member_active/ods_load_startlog.sh 可以传参数确定日志,如果没有传参使用昨天日期 会自动加上昨天的partition

#!/bin/bash APP=ODS hive=/opt/lagou/servers/hive-2.3.7/bin/hive # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL sql=" alter table "$APP".ods_start_log add partition(dt='$do_date'); " $hive -e "$sql" 第4节 json数据处理

数据文件中每行必须是一个完整的 json 串,一个 json串 不能跨越多行。 Hive 处理json数据总体来说有三个办法:

使用内建的函数get_json_object、json_tuple使用自定义的UDF第三方的SerDe 4.1、使用内建函数处理

get_json_object(string json_string, string path) 返回值:String 说明:解析json字符串json_string,返回path指定的内容;如果输入的json字符串无效,那么返回NUll;函数每次只能返回一个数据项;

json_tuple(jsonStr, k1, k2, …) 返回值:所有的输入参数、输出参数都是String; 说明:参数为一组键k1,k2,。。。。。和json字符串,返回值的元组。该方法比get_json_object高效,因此可以在一次调用中输入多个键;

explode,使用explod将Hive一行中复杂的 array 或 map 结构拆分成多行。

测试数据: 创建/data/lagoudw/logs/data/weibo.json 文件

user1;18;male;{"id": 1,"ids": [101,102,103],"total_number": 3} user2;20;female;{"id": 2,"ids":[201,202,203,204],"total_number": 4} user3;23;male;{"id": 3,"ids":[301,302,303,304,305],"total_number": 5} user4;17;male;{"id": 4,"ids": [401,402,403,304],"total_number":5} user5;35;female;{"id": 5,"ids": [501,502,503],"total_number":3}

建表加载数据:

create database test; use test; CREATE TABLE IF NOT EXISTS jsont1( username string, age int, sex string, json string )row format delimited fields terminated by ';'; load data local inpath '/data/lagoudw/logs/data/weibo.json' overwrite into table jsont1;

json的处理:

-- get 单层值 select username, age, sex, get_json_object(json, "$.id") id, get_json_object(json, "$.ids") ids, get_json_object(json, "$.total_number") num from jsont1; -- get 数组 select username, age, sex, get_json_object(json, "$.id") id, get_json_object(json, "$.ids[0]") ids0, get_json_object(json, "$.ids[1]") ids1, get_json_object(json, "$.ids[2]") ids2, get_json_object(json, "$.ids[3]") ids3, get_json_object(json, "$.total_number") num from jsont1; -- 使用 json_tuple 一次处理多个字段 select json_tuple(json, 'id', 'ids', 'total_number') from jsont1; -- 有语法错误 select username, age, sex, json_tuple(json, 'id', 'ids', 'total_number') from jsont1; -- 使用 explode + lateral view -- 在上一步的基础上,再将数据展开 -- 第一步,将 [101,102,103] 中的 [ ] 替换掉 -- select "[101,102,103]" -- select "101,102,103" select regexp_replace("[101,102,103]", "\\[|\\]", ""); -- 第二步,将上一步的字符串变为数组 select split(regexp_replace("[101,102,103]", "\\[|\\]", ""), ","); -- 第三步,使用explode + lateral view 将数据展开 select username, age, sex, id, ids, num from jsont1 lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num; with tmp as( select username, age, sex, id, ids, num from jsont1 lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num ) select username, age, sex, id, ids1, num from tmp lateral view explode(split(regexp_replace(ids, "\\[|\\]", ""), ",")) t1 as ids1;

在这里插入图片描述

小结:json_tuple 优点是一次可以解析多个json字段,对嵌套结果的解析操作复杂;

4.2、使用UDF处理

自定义UDF处理json串中的数组。自定义UDF函数:

输入:json串、数组的key输出:字符串数组

pom文件增加依赖

org.apache.hive hive-exec 2.3.7 provided

ParseJsonArray

package cn.lagou.dw.hive.udf; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import org.apache.hadoop.hive.ql.exec.UDF; import org.junit.Test; import java.util.ArrayList; public class ParseJsonArray extends UDF { public ArrayList evaluate(String jsonStr, String arrKey) { if (Strings.isNullOrEmpty(jsonStr)) { return null; } try { // 获取jsonArray JSONObject object = JSON.parseObject(jsonStr); JSONArray jsonArray = object.getJSONArray(arrKey); ArrayList result = new ArrayList(); for (Object o : jsonArray) { result.add(o.toString()); } return result; } catch (JSONException e) { return null; } } @Test public void JunitParseJsonArray() { String str = "{\"id\": 1,\"ids\":[101,102,103],\"total_number\": 3}"; String key = "ids"; ArrayList evaluate = evaluate(str, key); System.out.println(JSON.toJSONString(evaluate)); } }

打包cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar 使用自定义 UDF 函数:

-- 添加开发的jar包(在Hive命令行中) add jar /data/lagoudw/jars/cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar; -- 创建临时函数。指定类名一定要完整的路径,即包名加类名 create temporary function lagou_json_array as "cn.lagou.dw.hive.udf.ParseJsonArray"; -- 执行查询 use test; -- 解析json串中的数组 select username, age, sex, lagou_json_array(json, "ids") ids from jsont1; -- 解析json串中的数组,并展开 select username, age, sex, ids1 from jsont1 lateral view explode(lagou_json_array(json, "ids")) t1 as ids1; -- 解析json串中的id、num select username, age, sex, id, num from jsont1 lateral view json_tuple(json, 'id', 'total_number') t1 as id,num; -- 解析json串中的数组,并展开 select username, age, sex, ids1, id, num from jsont1 lateral view explode(lagou_json_array(json, "ids")) t1 as ids1 lateral view json_tuple(json, 'id', 'total_number') t1 as id,num;

在这里插入图片描述

4.3、使用SerDe处理

序列化是对象转换为字节序列的过程;反序列化是字节序列恢复为对象的过程; 对象的序列化主要有两种用途:

对象的持久化,即把对象转换成字节序列后保存到文件中对象数据的网络传送

SerDe 是Serializer 和 Deserializer 的简写形式。Hive使用Serde进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。SerDe包括Serialize/Deserilize 两个功能:

Serialize把Hive使用的java object转换成能写入HDFS字节序列,或者其他系统能识别的流文件Deserilize把字符串或者二进制流转换成Hive能识别的java object对象

Read : HDFS files => InputFileFormat => => Deserializer => Rowobject Write : Row object => Seriallizer => => OutputFileFormat => HDFSfiles 常见:https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#DeveloperGuide-HiveSerDe

Hive本身自带了几个内置的SerDe,还有其他一些第三方的SerDe可供选择。

create table t11(id string) stored as parquet; create table t12(id string) stored as ORC; desc formatted t11; desc formatted t12;

LazySimpleSerDe(默认的SerDe) ParquetHiveSerDe OrcSerde

对于纯 json 格式的数据,可以使用 JsonSerDe 来处理。

{"id": 1,"ids": [101,102,103],"total_number": 3} {"id": 2,"ids": [201,202,203,204],"total_number": 4} {"id": 3,"ids": [301,302,303,304,305],"total_number": 5} {"id": 4,"ids": [401,402,403,304],"total_number": 5} {"id": 5,"ids": [501,502,503],"total_number": 3} create table jsont2( id int, ids array, total_number int ) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; load data local inpath '/data/lagoudw/data/json2.dat' into table jsont2; 各种Json格式处理方法小结:

1、简单格式的json数据,使用get_json_object、json_tuple处理 2、对于嵌套数据类型,可以使用UDF 3、纯json串可使用JsonSerDe处理更简单

第5节 DWD层建表和数据加载

第5节 DWD层建表和数据加载

2020-08-02 18:19:32.966 [main] INFO com.lagou.ecommerce.AppStart - {"app_active": {"name":"app_active","json": {"entry":"1","action":"1","error_code":"0"},"time":1596309585861},"attr":{"area":"绍 兴","uid":"2F10092A10","app_v":"1.1.16","event_type":"common"," device_id":"1FB872-9A10010","os_type":"3.0","channel":"ML","language":"chinese","b rand":"Huawei-2"}} 2020-08-02

主要任务:ODS(包含json串) => DWD json数据解析,丢弃无用数据(数据清洗),保留有效信息,并将数据展开,形成每日启动明细表。

5.1、创建DWD层表 use DWD; drop table if exists dwd.dwd_start_log; CREATE TABLE dwd.dwd_start_log( `device_id` string, `area` string, `uid` string, `app_v` string, `event_type` string, `os_type` string, `channel` string, `language` string, `brand` string, `entry` string, `action` string, `error_code` string ) PARTITIONED BY (dt string) STORED AS parquet;

表的格式:parquet、分区表

5.2、加载DWD层数据

script/member_active/dwd_load_startlog.sh

#!/bin/bash source /etc/profile # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL sql=" with tmp as( select split(str, ' ')[7] line from ods.ods_start_log where dt='$do_date' ) insert overwrite table dwd.dwd_start_log partition(dt='$do_date') select get_json_object(line, '$.attr.device_id'), get_json_object(line, '$.attr.area'), get_json_object(line, '$.attr.uid'), get_json_object(line, '$.attr.app_v'), get_json_object(line, '$.attr.event_type'), get_json_object(line, '$.attr.os_type'), get_json_object(line, '$.attr.channel'), get_json_object(line, '$.attr.language'), get_json_object(line, '$.attr.brand'), get_json_object(line, '$.app_active.json.entry'), get_json_object(line, '$.app_active.json.action'), get_json_object(line, '$.app_active.json.error_code') from tmp; " hive -e "$sql"

可以执行下面命令导入测试数据

[root@linux122 ~]# cd /data/lagoudw/script/member_active [root@linux122 member_active]# dwd_load_startlog.sh 2020-07-21 # 2020-07-21 为传入的参数,如果没有,sh文件里取当天前面一天的日期

日志文件 =》 Flume =》 HDFS =》 ODS =》 DWD ODS =》 DWD;json数据的解析;数据清洗 下一步任务:DWD(会员的每日启动信息明细) => DWS(如何建表,如何加载数 据) 活跃会员 ===> 新增会员 ===> 会员留存

第6节 活跃会员

活跃会员:打开应用的会员即为活跃会员; 新增会员:第一次使用应用的用户,定义为新增会员; 留存会员:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;

活跃会员指标需求:每日、每周、每月的活跃会员数 DWD:会员的每日启动信息明细(会员都是活跃会员;某个会员可能会出现多次) DWS:每日活跃会员信息(关键)、每周活跃会员信息、每月活跃会员信息 每日活跃会员信息 ===> 每周活跃会员信息 每日活跃会员信息 ===> 每月活跃会员信息 ADS:每日、每周、每月活跃会员数(输出)

ADS表结构: daycnt weekcnt monthcnt dt

备注:周、月为自然周、自然月 处理过程: 1、建表(每日、每周、每月活跃会员信息) 2、每日启动明细 ===> 每日活跃会员 3、每日活跃会员 => 每周活跃会员;每日活跃会员 => 每月活跃会员 4、汇总生成ADS层的数据

6.1、创建DWS层表 use dws; drop table if exists dws.dws_member_start_day; create table dws.dws_member_start_day ( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string ) COMMENT '会员日启动汇总' partitioned by(dt string) stored as parquet; drop table if exists dws.dws_member_start_week; create table dws.dws_member_start_week( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string, `week` string ) COMMENT '会员周启动汇总' PARTITIONED BY (`dt` string) stored as parquet; drop table if exists dws.dws_member_start_month; create table dws.dws_member_start_month( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string, `month` string ) COMMENT '会员月启动汇总' PARTITIONED BY (`dt` string) stored as parquet; 6.2、加载DWS层数据

script/member_active/dws_load_member_start.sh

#!/bin/bash source /etc/profile # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL # 汇总得到每日活跃会员信息;每日数据汇总得到每周、每月数据 sql=" insert overwrite table dws.dws_member_start_day partition(dt='$do_date') select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)) from dwd.dwd_start_log where dt='$do_date' group by device_id; -- 汇总得到每周活跃会员 insert overwrite table dws.dws_member_start_week partition(dt='$do_date') select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)), date_add(next_day('$do_date', 'mo'), -7) from dws.dws_member_start_day where dt >= date_add(next_day('$do_date', 'mo'), -7) and dt = date_format('$do_date', 'yyyy-MM-01') and dt


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3