第三章 电商分析之会员活跃度

您所在的位置:网站首页 什么是活跃度 第三章 电商分析之会员活跃度

第三章 电商分析之会员活跃度

2024-07-14 06:35| 来源: 网络整理| 查看: 265

电商分析之会员活跃度 第 1 节 需求分析

会员数据是后期营销的很重要的数据。网店会专门针对会员进行一系列营销活动。

电商会员一般门槛较低,注册网站即可加入。有些电商平台的高级会员具有时效性,需要购买VIP会员卡或一年内消费额达到多少才能成为高级会员。

 

计算指标:

新增会员:每日新增会员数

活跃会员:每日,每周,每月的活跃会员数

会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率

 

指标口径业务逻辑:

会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据IMEI号,IOS系统通常根据OpenUDID来标识一个独立会员,每部移动设备是一个会员;

活跃会员:打开应用的会员即为活跃会员,暂不考虑用户的实际使用情况。一台设备每天多次打开计算为一个活跃会员。在自然周内启动过应用的会员为周活跃会员,同理还有月活跃会员;

会员活跃率:一天内活跃会员数与总会员数的比率是日活跃率;还有周活跃率(自 然周)、月活跃率(自然月);

新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。新增用户包括日新增会员、周(自然周)新增会员、月(自然 月)新增会员;

留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。

已知条件: 1、明确了需求 2、输入:启动日志(OK)、事件日志 3、输出:新增会员、活跃会员、留存会员 4、日志文件、ODS、DWD、DWS、ADS(输出) 下一步做什么? 数据采集:日志文件 => Flume => HDFS => ODS 第 2 节 日志数据采集

原始日志数据(一条启动日志)

2020-07-30 14:18:47.339 [main] INFO com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529} ,"attr":{"area":"泰 安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","dev ice_id":"1FB872- 9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","br and":"iphone-9"}}

数据采集的流程:

image.png

选择Flume作为采集日志数据的工具:

Flume 1.6 无论是Spooling Directory Source、Exec Source均不能很好的满足动态实时收集的需求 Flume 1.8+ 提供了一个非常好用的 Taildir Source使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集 2.1、taildir source配置

taildir Source的特点:

使用正则表达式匹配目录中的文件名监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink高可靠,不会丢失数据不会对跟踪文件有任何处理,不会重命名也不会删除不支持Windows,不能读二进制文件。支持按行读取文本文件

 

taildir source配置

a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log positionFile

配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题

filegroups

指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)

filegroups.

配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配

2.2、hdfs sink配置 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path =/user/data/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog. # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 100 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true

HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:

基于时间。hdfs.rollInterval 30秒基于文件大小。hdfs.rollSize 1024字节基于event数量。hdfs.rollCount 10个event基于文件空闲时间。hdfs.idleTimeout 0minBlockReplicas。默认值与 hdfs 副本数一致。设为1是为了让 Flume 感知不到hdfs的块复制,此时其他的滚动方式配置(时间间隔、文件大小、events数量)才不会受影响 2.3、Agent的配置 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = startlog a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

/data/lagoudw/conf/flume-log2hdfs.conf

2.4、Flume的优化配置

1、启动agent

flume-ng agent --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console

2、向 /data/lagoudw/logs/ 目录中放入日志文件

报错: java.lang.OutOfMemoryError: GC overhead limit exceeded

image.png

缺省情况下 Flume jvm堆最大分配20m,这个值太小,需要调整。

3、解决方案:在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容

export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" # 要想使配置文件生效,还要在命令行中指定配置文件目录 flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 - Dflume.roog.logger=INFO,console Flume内存参数设置及优化: 根据日志数据量的大小,Jvm堆一般要设置为4G或更高-Xms -Xmx 最好设置一致,减少内存抖动带来的性能影响

存在的问题:Flume放数据时,使用本地时间;不理会日志的时间戳

2.5、自定义拦截器

前面 Flume Agent 的配置使用了本地时间,可能导致数据存放的路径不正确。

要解决以上问题需要使用自定义拦截器。

agent用于测试自定义拦截器。netcat source =>logger sink

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = linux122 a1.sources.r1.port = 9999 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source、channel、sink之间的关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

自定义拦截器的原理:

1、自定义拦截器要集成Flume 的 Interceptor

2、Event 分为header 和 body(接收的字符串)

3、获取header和body

4、从body中获取"time":1596382570539,并将时间戳转换为字符串 "yyyy-MM- dd"

5、将转换后的字符串放置header中

 

自定义拦截器的实现:

1、获取 event 的 header

2、获取 event 的 body

3、解析body获取json串

4、解析json串获取时间戳

5、将时间戳转换为字符串 "yyyy-MM-dd"

6、将转换后的字符串放置header中

7、返回event

UTF-8 org.apache.flume flume-ng-core 1.9.0 provided com.alibaba fastjson 1.1.23 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single package cn.lagou.dw.flume.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import org.apache.commons.compress.utils.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.SimpleEvent; import org.apache.flume.interceptor.Interceptor; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class CustomerInterceptor implements Interceptor { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @Override public void initialize() { } @Override public Event intercept(Event event) { String eventbody = new String(event.getBody(), Charsets.UTF_8); Map headerMap = event.getHeaders(); String[] bodyArr = eventbody.split("\\s+"); try { String jsonStr = bodyArr[6]; if (Strings.isNullOrEmpty(jsonStr)) { return null; } JSONObject jsonObject = JSON.parseObject(jsonStr).getJSONObject("app_active"); String timestampStr = jsonObject.getString("time"); // 将 timestamp 转为 时间日期类型(格式:yyyy-MM-dd) long timeStamp = Long.parseLong(timestampStr); String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); headerMap.put("logtime", date); event.setHeaders(headerMap); } catch (Exception e) { headerMap.put("logtime", "unknown"); event.setHeaders(headerMap); } return event; } @Override public List intercept(List events) { List out = new ArrayList(); for(Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } } }

将程序打包,放在 flume/lib目录下;

启动Agent测试

flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ --conf-file /root/data/lagoudw/conf/flumetest1.conf -name a1 -Dflume.root.logger=INFO,console 2.6、采集启动日志(使用自定义拦截器)

1、定义配置文件

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/ a1.sinks.k1.hdfs.filePrefix = startlog. a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 v a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

修改:

给source增加自定义拦截器去掉本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true根据header中的logtime写文件

2、启动服务

# 测试 flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ --conf-file /root/data/lagoudw/conf/flume-log2hdfs2.conf -name a1 -Dflume.root.logger=INFO,consolet.logger=INFO,console

3、拷贝日志 

4、检查HDFS文件

2.7、采集启动日志和事件日志

本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。要想一次拿到全部日志需要监控多个目录。

image.png

总体思路

1、taildir监控多个目录

2、修改自定义拦截器,不同来源的数据加上不同标志

3、hdfs sink 根据标志写文件

 

Agent配置

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log a1.sources.r1.headers.f1.logtype = start a1.sources.r1.filegroups.f2 = /root/data/lagoudw/logs/event/.*log a1.sources.r1.headers.f2.logtype = event # 自定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/ a1.sinks.k1.hdfs.filePrefix = %{logtype}log a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小32M) a1.sinks.k1.hdfs.rollSize = 33554432 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 1000 # 使用本地时间 # a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 filegroups

指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)

headers..

给event增加header key。不同的filegroup,可配置不同的value

 

自定义拦截器

编码完成后打包上传服务器,放置在$FLUME_HOME/lib 下

package cn.lagou.dw.flume.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import org.apache.commons.compress.utils.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @Override public void initialize() { } @Override public Event intercept(Event event) { String eventbody = new String(event.getBody(), Charsets.UTF_8); Map headerMap = event.getHeaders(); String[] bodyArr = eventbody.split("\\s+"); try { String jsonStr = bodyArr[6]; if (Strings.isNullOrEmpty(jsonStr)) { return null; } String timestampStr = ""; JSONObject jsonObject = JSON.parseObject(jsonStr); if (headerMap.getOrDefault("logtype", "").equals("start")){ // 取启动日志的时间戳 timestampStr = jsonObject.getJSONObject("app_active").getString("time"); } else if (headerMap.getOrDefault("logtype","").equals("event")) { // 取事件日志第一条记录的时间戳 JSONArray jsonArray = jsonObject.getJSONArray("lagou_event"); if (jsonArray.size() > 0){ timestampStr = jsonArray.getJSONObject(0).getString("time"); } } // 将 timestamp 转为 时间日期类型(格式:yyyy-MM-dd) long timeStamp = Long.parseLong(timestampStr); String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); headerMap.put("logtime", date); event.setHeaders(headerMap); } catch (Exception e) { headerMap.put("logtime", "unknown"); event.setHeaders(headerMap); } return event; } @Override public List intercept(List events) { List out = new ArrayList(); for(Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } @Test public void testJunit(){ String str = new String("2020-08-20 12:00:58.400 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"3\",\"goodsid\":\"0\",\"loading_time\":\"100\",\"action\":\"3\",\"staytime\":\"34\",\"showtype\":\"4\"},\"time\":1595340530671},{\"name\":\"praise\",\"json\":{\"id\":4,\"type\":2,\"add_time\":\"1597827924588\",\"userid\":5,\"target\":9},\"time\":1595301323236}],\"attr\":{\"area\":\"文登\",\"uid\":\"2F10092A1\",\"app_v\":\"1.1.9\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1001\",\"os_type\":\"0.93\",\"channel\":\"BB\",\"language\":\"chinese\",\"brand\":\"xiaomi-9\"}}"); Map map = new HashMap(); Event event = new SimpleEvent(); map.put("logtype","event"); event.setHeaders(map); event.setBody(str.getBytes(Charsets.UTF_8)); LogTypeInterceptor customerInterceptor = new LogTypeInterceptor(); Event outEvent = customerInterceptor.intercept(event); Map headers = outEvent.getHeaders(); System.out.println(JSON.toJSONString(headers)); } }

测试

启动Agent,拷贝日志,检查HDFS文件

# 清理环境 rm -f /data/lagoudw/conf/startlog_position.json rm -f /data/lagoudw/logs/start/*.log rm -f /data/lagoudw/logs/event/*.log # 启动 Agent flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ --conf-file /root/data/lagoudw/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,console # 拷贝日志 cd /data/lagoudw/logs/source cp event0802.log ../event/ cp start0802.log ../start/ # 检查HDFS文件 hdfs dfs -ls /user/data/logs/event hdfs dfs -ls /user/data/logs/start # 生产环境中用以下方式启动Agent nohup flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 - Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 & nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程/dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞标准输入0,从键盘获得输入 /proc/self/fd/0标准输出1,输出到屏幕(控制台) /proc/self/fd/1错误输出2,输出到屏幕(控制台) /proc/self/fd/2>/dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容2>&1 错误输出将会和标准输出输出到同一个地方>/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中 2.8 日志数据采集小结 使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同header在每个目录中可以使用正则匹配多个文件使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中hdfs sink使用event header中的信息写数据(控制写文件的位置)hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)调节flume jvm内存的分配 第 3 节 ODS建表和数据加载

image.png

ODS层的数据与源数据的格式基本相同。

创建ODS层表:

use ODS; create external table ods.ods_start_log( `str` string) comment '用户启动日志信息' partitioned by (`dt` string) location '/user/data/logs/start'; -- 加载数据的功能(测试时使用) alter table ods.ods_start_log add partition(dt='2020-08-02'); alter table ods.ods_start_log drop partition (dt='2020-08-02');

加载启动日志数据:

script/member_active/ods_load_log.sh

可以传参数确定日志,如果没有传参使用昨天日期

#!/bin/bash APP=ODS hive=/opt/lagou/servers/hive-2.3.7/bin/hive # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL sql=" alter table "$APP".ods_start_log add partition(dt='$do_date'); " $hive -e "$sql" 第 4 节 json数据处理

数据文件中每行必须是一个完整的 json 串,一个 json串不能跨越多行。

Hive 处理json数据总体来说有三个办法:

使用内建的函数get_json_object、json_tuple使用自定义的UDF第三方的SerDe 4.1、使用内建函数处理

get_json_object(string json_string, string path)

返回值:String

说明:解析json字符串json_string,返回path指定的内容;如果输入的json字符串无效,那么返回NUll;函数每次只能返回一个数据项;

json_tuple(jsonStr, k1, k2, ...)

返回值:所有的输入参数、输出参数都是String;

说明:参数为一组键k1,k2,。。。。。和json字符串,返回值的元组。该方法比 get_json_object高效,因此可以在一次调用中输入多个键;

explode,使用explod将Hive一行中复杂的 array 或 map 结构拆分成多行。

测试数据:

user1;18;male;{"id": 1,"ids": [101,102,103],"total_number": 3} user2;20;female;{"id": 2,"ids": [201,202,203,204],"total_number":4} user3;23;male;{"id": 3,"ids":[301,302,303,304,305],"total_number": 5} user4;17;male;{"id": 4,"ids": [401,402,403,304],"total_number":5} user5;35;female;{"id": 5,"ids": [501,502,503],"total_number": 3}

建表加载数据:

CREATE TABLE IF NOT EXISTS jsont1( username string, age int, sex string, json string ) row format delimited fields terminated by ';'; load data local inpath '/root/data/lagoudw/test/weibo.json' overwrite into table jsont1;

json的处理:

-- get 单层值 select username, age, sex, get_json_object(json, "$.id") id, get_json_object(json, "$.ids") ids, get_json_object(json, "$.total_number") num from jsont1; -- get 数组 select username, age, sex, get_json_object(json, "$.id") id, get_json_object(json, "$.ids[0]") ids0, get_json_object(json, "$.ids[1]") ids1, get_json_object(json, "$.ids[2]") ids2, get_json_object(json, "$.ids[3]") ids3, get_json_object(json, "$.total_number") num from jsont1; -- 使用 json_tuple 一次处理多个字段 select json_tuple(json, 'id', 'ids', 'total_number') from jsont1; -- 有语法错误,只能单独处理json串。 select username, age, sex, json_tuple(json, 'id', 'ids', 'total_number') from jsont1; -- 使用 explode + lateral view -- 在上一步的基础上,再将数据展开 -- 第一步,将 [101,102,103] 中的 [ ] 替换掉 -- select "[101,102,103]" -- select "101,102,103" select regexp_replace("[101,102,103]", "\\[|\\]", ""); -- 第二步,将上一步的字符串变为数组 select split(regexp_replace("[101,102,103]", "\\[|\\]", ""), ","); -- 第三步,使用explode + lateral view 将数据展开 select username, age, sex, id, ids, num from jsont1 lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num; -- 完整代码 with tmp as( select username, age, sex, id, ids, num from jsont1 lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num ) select username, age, sex, id, ids1, num from tmp lateral view explode(split(regexp_replace(ids, "\\[|\\]", ""), ",")) t1 as ids1;

小结:json_tuple 优点是一次可以解析多个json字段,对嵌套结果的解析操作复杂;

4.2、使用UDF处理

自定义UDF处理json串中的数组。

自定义UDF函数:

输入:json串、数组的key

输出:字符串数组

 

pom文件增加依赖

org.apache.hive hive-exec 2.3.7 provided package cn.lagou.dw.hive.udf; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import org.apache.hadoop.hive.ql.exec.UDF; import org.junit.Test; import java.util.ArrayList; public class ParseJsonArray extends UDF { public ArrayList evaluate(String jsonStr, String arrKey){ if (Strings.isNullOrEmpty(jsonStr)) { return null; } try{ JSONObject object = JSON.parseObject(jsonStr); JSONArray jsonArray = object.getJSONArray(arrKey); ArrayList result = new ArrayList(); for (Object o: jsonArray){ result.add(o.toString()); } return result; } catch (JSONException e){ return null; } } @Test public void JunitParseJsonArray(){ String str = "{\"id\": 1,\"ids\":[101,102,103],\"total_number\": 3}"; String key = "ids"; ArrayList evaluate = evaluate(str, key); System.out.println(JSON.toJSONString(evaluate)); } }

使用自定义 UDF 函数:

-- 添加开发的jar包(在Hive命令行中) add jar /root/data/lagoudw/jars/cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar; -- 创建临时函数。指定类名一定要完整的路径,即包名加类名 create temporary function lagou_json_array as "cn.lagou.dw.hive.udf.ParseJsonArray"; -- 执行查询 -- 解析json串中的数组 select username, age, sex, lagou_json_array(json, "ids") ids from jsont1; -- 解析json串中的数组,并展开 select username, age, sex, ids1 from jsont1 lateral view explode(lagou_json_array(json, "ids")) t1 as ids1; -- 解析json串中的id、num select username, age, sex, id, num from jsont1 lateral view json_tuple(json, 'id', 'total_number') t1 as id, num; -- 解析json串中的数组,并展开 select username, age, sex, ids1, id, num from jsont1 lateral view explode(lagou_json_array(json, "ids")) t1 as ids1 lateral view json_tuple(json, 'id', 'total_number') t1 as id, num; 4.3、使用SerDe处理

序列化是对象转换为字节序列的过程;反序列化是字节序列恢复为对象的过程;

对象的序列化主要有两种用途:

对象的持久化,即把对象转换成字节序列后保存到文件中对象数据的网络传送

SerDe 是Serializer 和 Deserializer 的简写形式。Hive使用Serde进行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。SerDe包括 Serialize/Deserilize 两个功能:

Serialize把Hive使用的java object转换成能写入HDFS字节序列,或者其他系统能识别的流文件Deserialize把字符串或者二进制流转换成Hive能识别的java object对象

Read : HDFS files => InputFileFormat => => Deserializer => Row object

Write : Row object => Seriallizer => => OutputFileFormat => HDFS files

常见:https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#Deve loperGuide-HiveSerDe

 

Hive本身自带了几个内置的SerDe,还有其他一些第三方的SerDe可供选择。

create table t11(id string) stored as parquet; create table t12(id string) stored as ORC; desc formatted t11; desc formatted t12;

LazySimpleSerDe(默认的SerDe)

ParquetHiveSerDe

OrcSerde

 

对于纯 json 格式的数据,可以使用 JsonSerDe 来处理。

{"id": 1,"ids": [101,102,103],"total_number": 3} {"id": 2,"ids": [201,202,203,204],"total_number": 4} {"id": 3,"ids": [301,302,303,304,305],"total_number": 5} {"id": 4,"ids": [401,402,403,304],"total_number": 5} {"id": 5,"ids": [501,502,503],"total_number": 3} create table jsont2( id int, ids array, total_number int ) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; load data local inpath '/root/data/lagoudw/test/json.dat' into table jsont2;

各种Json格式处理方法小结:

1、简单格式的json数据,使用get_json_object、json_tuple处理

2、对于嵌套数据类型,可以使用UDF

3、纯json串可使用JsonSerDe处理更简单

第 5 节 DWD层建表和数据加载 2020-08-02 18:19:32.966 [main] INFO com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json": {"entry":"1","action":"1","error_code":"0"},"time":1596309585861} ,"attr":{"area":"绍 兴","uid":"2F10092A10","app_v":"1.1.16","event_type":"common","de vice_id":"1FB872- 9A10010","os_type":"3.0","channel":"ML","language":"chinese","bra nd":"Huawei-2"}}

主要任务:ODS(包含json串) => DWD json数据解析,丢弃无用数据(数据清洗),保留有效信息,并将数据展开,形成每日启动明细表。

5.1、创建DWD层表 use DWD; drop table if exists dwd.dwd_start_log; CREATE TABLE dwd.dwd_start_log( `device_id` string, `area` string, `uid` string, `app_v` string, `event_type` string, `os_type` string, `channel` string, `language` string, `brand` string, `entry` string, `action` string, `error_code` string ) PARTITIONED BY (dt string) STORED AS parquet;

表的格式:parquet、分区表

5.2、加载DWD层数据

script/member_active/dwd_load_start.sh

#!/bin/bash source /etc/profile # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL sql=" with tmp as( select split(str, ' ')[7] line from ods.ods_start_log where dt='$do_date' ) insert overwrite table dwd.dwd_start_log partition(dt='$do_date') select get_json_object(line, '$.attr.device_id'), get_json_object(line, '$.attr.area'), get_json_object(line, '$.attr.uid'), get_json_object(line, '$.attr.app_v'), get_json_object(line, '$.attr.event_type'), get_json_object(line, '$.attr.os_type'), get_json_object(line, '$.attr.channel'), get_json_object(line, '$.attr.language'), get_json_object(line, '$.attr.brand'), get_json_object(line, '$.app_active.json.entry'), get_json_object(line, '$.app_active.json.action'), get_json_object(line, '$.app_active.json.error_code') from tmp; " hive -e "$sql"

日志文件 =》 Flume =》 HDFS =》 ODS =》 DWD ODS =》 DWD;

json数据的解析;数据清洗

 

下一步任务:DWD(会员的每日启动信息明细) => DWS(如何建表,如何加载数据)

活跃会员 ===> 新增会员 ===> 会员留存

第 6 节 活跃会员

活跃会员:打开应用的会员即为活跃会员;

新增会员:第一次使用应用的用户,定义为新增会员;

留存会员:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;

活跃会员指标需求:每日、每周、每月的活跃会员数

DWD:会员的每日启动信息明细(会员都是活跃会员;某个会员可能会出现多次)

DWS:每日活跃会员信息(关键)、每周活跃会员信息、每月活跃会员信息

每日活跃会员信息 ===> 每周活跃会员信息

每日活跃会员信息 ===> 每月活跃会员信息

ADS:每日、每周、每月活跃会员数(输出)

ADS表结构: daycnt weekcnt monthcnt dt

备注:周、月为自然周、自然月

 

处理过程:

1、建表(每日、每周、每月活跃会员信息)

2、每日启动明细 ===> 每日活跃会员

3、每日活跃会员 => 每周活跃会员;每日活跃会员 => 每月活跃会员

4、汇总生成ADS层的数据

6.1、创建DWS层表 use dws; drop table if exists dws.dws_member_start_day; create table dws.dws_member_start_day ( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string ) COMMENT '会员日启动汇总' partitioned by(dt string) stored as parquet; drop table if exists dws.dws_member_start_week; create table dws.dws_member_start_week( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string, `week` string ) COMMENT '会员周启动汇总' PARTITIONED BY (`dt` string) stored as parquet; drop table if exists dws.dws_member_start_month; create table dws.dws_member_start_month( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string, `month` string ) COMMENT '会员月启动汇总' PARTITIONED BY (`dt` string) stored as parquet; 6.2、加载DWS层数据

script/member_active/dws_load_member_start.sh

#!/bin/bash source /etc/profile # 可以输入日期;如果未输入日期取昨天的时间 if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi # 定义要执行的SQL # 汇总得到每日活跃会员信息;每日数据汇总得到每周、每月数据 sql=" -- 汇总得到每日活跃会员 insert overwrite table dws.dws_member_start_day partition(dt='$do_date') select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)) from dwd.dwd_start_log where dt='$do_date' group by device_id; -- 汇总得到每周活跃会员 insert overwrite table dws.dws_member_start_week partition(dt='$do_date') select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)), date_add(next_day('$do_date', 'mo'), -7) from dws.dws_member_start_day where dt >= date_add(next_day('$do_date', 'mo'), -7) and dt = date_format('$do_date', 'yyyy-MM-01') and dt


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3