大数据之数据仓库建设(一)

您所在的位置:网站首页 数据仓库的大数据集成 大数据之数据仓库建设(一)

大数据之数据仓库建设(一)

2024-07-16 09:26| 来源: 网络整理| 查看: 265

一、数据仓库:

一个面向分析、反映历史变化的的数据仓库; 中小公司数据量:一天1000万条数据,17G;一个月500G,一年6T数据量;

数据仓库技术:

1、传统数仓一般是采用关系型数据库; 2、大数据领域中: 使用HDFS做存储、使用spark 或mapreduce做运算、使用spark sql或者hive做sql引擎; impala既可以做运算又可是做sql引擎; click house即可以做存储,又可以做运算、sql引擎;

分层设计:

数据仓库中的数据表,往往是分层管理、分层计算的:

ODS 层:操作最原始的数据的 贴源层 DWD 层:数仓明细层;一般是对 ODS 层的表按主题进行加工和划分;本层中表记录的还是明细数据; DWS 层:有轻度聚合的 数仓服务层; ADS 层: 应用层,主要是一些结果报表!

越往上数据存储时间越长,一般而言,ods存三个月、dwd存六个月、 dws存一年、ads存三年。

二、ODS层开发

可以使用load定时将数据加到hive仓库里;

三、DWD层开发

1 、纬度集成

实现方案: 构建一个geohash码地理位置知识库、构建一个ip2region地理位置知识库,将地理位置知识库作为广播变量,然后在日志处理的算子中,对每条数据抽取gps经纬度,转成geohash码,到知识库匹配,如果gps库匹配失败,则利用ip地址到ip2region知识库中匹配。

将经纬度解析成为地理位置: 河北省,石家庄市,裕华区;需要引入geohash工具包,编写geohash码地理位置知识库

ackage cn.doitedu.dw.util import java.util.Properties import ch.hsr.geohash.GeoHash import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.UserDefinedFunction /** * 经纬度地理位置知识库,加工成geohash码地理位置知识库 */ object GeoHashDict { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .config("spark.sql.shuffle.partitions","2") .appName("地理位置知识库加工") .master("local") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ // 加载mysql中的原始表数据 val props = new Properties() props.load(GeoHashDict.getClass.getClassLoader.getResourceAsStream("db.properties")) val df = spark.read.jdbc("jdbc:mysql://localhost:3306/realtimedw?useUnicode=true&characterEncoding=utf8","t_md_areas",props) // 扁平化操作 df.createTempView("df") val df2 = spark.sql( """ | |select |province.areaname as province, |city.areaname as city, |region.areaname as region, |region.bd09_lng as lng, |region.bd09_lat as lat | |from df region join df city on region.parentid = city.id and region.level=3 | join df province on city.parentid=province.id |""".stripMargin) df2.show(20,false) val gps2geo: UserDefinedFunction = udf((lat:Double, lng:Double)=>{ GeoHash.geoHashStringWithCharacterPrecision(lat,lng,5) }) val res = df2.select('province,'city,'region,gps2geo('lat,'lng) as "geohash") res.write.parquet("dataware/data/geodict") spark.close() } }

将 IP 地址解析成省、市、县(区)信息;

单机版:

package cn.doitedu.test import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher} object Ip2RegionDemo { def main(args: Array[String]): Unit = { val searcher = new DbSearcher(new DbConfig(), "dataware/data/ip2region/ip2region.db") val block1: DataBlock = searcher.memorySearch("221.218.212.95") val block2: DataBlock = searcher.memorySearch("255.255.255.255") val block3: DataBlock = searcher.memorySearch("221.218.212.0") println(block1) println(block2) println(block3) } }

集群版:

package cn.doitedu.test import java.io.{File, FileInputStream} import org.apache.spark.sql.SparkSession import org.lionsoul.ip2region.{DbConfig, DbSearcher} object Ip2RegionSparkDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("") .master("local") .getOrCreate() import spark.implicits._ val ds = spark.createDataset(Seq("221.218.212.95", "255.255.255.255", "221.218.212.0")) val dbFile: File = new File("dataware/data/ip2region/ip2region.db") val in: FileInputStream = new FileInputStream(dbFile) val dbBin: Array[Byte] = new Array[Byte](dbFile.length().toInt) in.read(dbBin) val bc = spark.sparkContext.broadcast(dbBin) val res = ds.mapPartitions(iter => { val byteArr = bc.value val searcher = new DbSearcher(new DbConfig(), byteArr) iter.map(ip => { val areaInfo = searcher.memorySearch(ip).toString (ip, areaInfo) }) }).toDF("ip", "area") res.show(100, false) spark.close() } }

2、 id 设备绑定(评分计算)

问题: 在登录状态下,日志中会采集到用户的登录 id(account),可以做到用户身份的精确标识;而在匿名状态下,日志中没有采集到用户的登录 id;

实现方案: 一个设备 ID 被绑定到某个登陆 ID(A)之后,如果该设备在后续一段时间(比如一个月内)被一个新的登陆 ID(B)更频繁使用,则该设备 ID 会被调整至绑定登陆 ID(B)。

步骤:

加载 T 日日志数据,抽取 设备 id、登录账号、会话 id、时间戳根据设备 id+登录账号 account 分组,计算每个设备上每个登录账号的登陆次数(评分)加载 T-1 日的绑定评分结果将 T 日评分表 full join T-1 日评分表,根据情况进行取值 原则:两边都有,分数累加;T-1 有,T 无,则分数衰减

测试版:

第一步:计算前一天的历史评分

package cn.doitedu.dw.util import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{DataTypes, StructType} /** * 设备id与登录之间的绑定关系计算 * 逻辑试验用 * 只考虑当天的数据 */ object IdBindOneDay { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession.builder() .config("spark.sql.shuffle.partitions", "2") .enableHiveSupport() .appName("地理位置知识库加工") .master("local") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val schema = new StructType() .add("account", DataTypes.StringType) .add("deviceid", DataTypes.StringType) .add("sessionid", DataTypes.StringType) .add("ts", DataTypes.LongType) val logDf = spark.read.option("header", "true").schema(schema) //.csv("dataware/data/idbind/input/day01") // 本地路径 .csv("/idbind/input/day01") //hdfs路径 logDf.createTempView("logdf") val loginCnts = spark.sql( """ | |insert into table dwd17.id_account_bind partition(dt='2020-10-06') |select |deviceid, |account, |min(ts) as first_login_ts, |count(distinct sessionid)*100 as bind_score |from logdf |group by deviceid,account | |""".stripMargin) spark.close() } }

第二步:每天滚动合并更新

package cn.doitedu.dw.util import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{DataTypes, StructType} /** * 设备id和登录账号绑定关系计算 * 每天滚动合并更新 */ object IdBindRollCombine { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession.builder() .config("spark.sql.shuffle.partitions","2") .enableHiveSupport() // 开启hive整合支持(同时,需要引入spark-hive的依赖;引入hadoop和hive的配置文件) .appName("地理位置知识库加工") .master("local") .getOrCreate() val schema = new StructType() .add("account",DataTypes.StringType) .add("deviceid",DataTypes.StringType) .add("sessionid",DataTypes.StringType) .add("ts",DataTypes.LongType) // 加载T日日志数据 val logDf = spark.read.option("header","true").schema(schema).csv("/idbind/input/day02") logDf.createTempView("logdf") // 计算T日的 设备->账号 绑定得分 val loginCnts = spark.sql( """ | |select |deviceid, |account, |-- count(distinct sessionid) as login_cnt, |min(ts) as first_login_ts, |count(distinct sessionid)*100 as bind_score |from logdf |group by deviceid,account | |""".stripMargin) loginCnts.createTempView("today") println("当天评分结果") loginCnts.show(100) // 加载 T-1的 绑定得分 (从hive的绑定评分表中加载) // val bindScorePre = spark.read.parquet("dataware/data/idbind/output/day01") val bindScorePre = spark.read.table("dwd17.id_account_bind").where("dt='2020-10-06'") println("历史评分结果") bindScorePre.show(100) bindScorePre.createTempView("yestoday") // 全外关联两个绑定得分表 // 并将结果写入hive表的当天分区(T-1日分区就无用了) val combined = spark.sql( """ | |insert into table dwd17.id_account_bind partition(dt='2020-10-07') | |select |if(today.deviceid is null,yestoday.deviceid,today.deviceid) as deviceid, |if(today.account is null,yestoday.account,today.account) as account, |if(yestoday.first_login_ts is not null,yestoday.first_login_ts,today.first_login_ts) as first_login_ts, |-- if(today.account is null,yestoday.login_cnt,today.login_cnt+yestoday.login_cnt) as login_cnt, |if(today.account is null,yestoday.bind_score*0.9,today.bind_score+if(yestoday.bind_score is null,0,yestoday.bind_score)) as bind_score |from | today |full join | yestoday |on today.deviceid=yestoday.deviceid and today.account=yestoday.account | |""".stripMargin) spark.close() } }

正式版:

第一步:每天滚动合并更新

package cn.doitedu.dw.util import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{DataTypes, StructType} /** * 设备id和登录账号绑定关系计算 * 每天滚动合并更新 * * 生产部署,要放在yarn上运行 */ object IdBind { def main(args: Array[String]): Unit = { if(args.length账号 绑定得分 val loginCnts = spark.sql( """ | |select |deviceid, |if(account is null or trim(account)='',null,account) as account, |-- count(distinct sessionid) as login_cnt, |min(timestamp) as first_login_ts, |count(distinct sessionid)*100 as bind_score |from logdf |group by deviceid,account | |""".stripMargin) loginCnts.createTempView("today") println("当天评分结果") loginCnts.show(100) // 加载 T-1的 绑定得分 (从hive的绑定评分表中加载) // val bindScorePre = spark.read.parquet("dataware/data/idbind/output/day01") val bindScorePre = spark.read.table("dwd17.id_account_bind").where(s"dt='${args(2)}'") println("历史评分结果") bindScorePre.show(100) bindScorePre.createTempView("yestoday") // 全外关联两个绑定得分表 // 并将结果写入hive表的当天分区(T-1日分区就无用了) val combined = spark.sql( s""" | |insert into table dwd17.id_account_bind partition(dt='${args(1)}') | |select |if(today.deviceid is null,yestoday.deviceid,today.deviceid) as deviceid, |if(today.account is null,yestoday.account,today.account) as account, |if(yestoday.first_login_ts is not null,yestoday.first_login_ts,today.first_login_ts) as first_login_ts, |-- if(today.account is null,yestoday.login_cnt,today.login_cnt+yestoday.login_cnt) as login_cnt, |if(today.account is null,yestoday.bind_score*0.9,today.bind_score+if(yestoday.bind_score is null,0,yestoday.bind_score)) as bind_score |from | today |full join | yestoday |on today.deviceid=yestoday.deviceid and today.account=yestoday.account | |""".stripMargin) spark.close() } }

第二步:编写shell脚本,定期执行

#!/bin/bash # @author hunter@doitedu # @date 2020-10-10 # @desc id绑定评分计算任务启动脚本 DT_HIST=`date -d'-2 day' +%Y-%m-%d` DT_CACL=`date -d'-1 day' +%Y-%m-%d` if [[ $1 && $2 ]] then DT_HIST=$1 DT_CACL=$2 fi echo $DT_HIST echo $DT_CACL export SPARK_HOME=/opt/apps/spark-2.4.4 echo "准备提交任务,计算日期:${DT_CACL} ,历史评分日期:${DT_HIST}" ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class cn.doitedu.dw.util.IdBind \ --jars json-serde-1.3.8.jar \ --name id_bind_calc \ --driver-memory 1024M \ --executor-memory 2G \ --queue default \ --num-executors 2 idbind.jar yarn $DT_CACL $DT_HIST if [ $? -eq 0 ] then echo "congratulations! 任务执行完成! 邮件已发送至 [email protected]" else echo "what a pity! 任务执行失败! 邮件已发送至 [email protected]" fi

yarn的生产环境参数说明:

* yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler The class to use as the resource scheduler. * yarn.scheduler.minimum-allocation-mb 1024 每个容器分配内存的最小值 * yarn.scheduler.maximum-allocation-mb 8192 每个容器分配内存的最大值 * yarn.scheduler.minimum-allocation-vcores 1 每个容器分配cpu核数的最小值 * yarn.scheduler.maximum-allocation-vcores 4 每个容器分配cpu核数的最大值 * yarn.nodemanager.resource.memory-mb 8g 每个nodemanager上的内存资源总量 * yarn.nodemanager.resource.cpu-vcores 8core 每个nodemanager上的cpu核数资源总量

3、ODS 数据加载到 DWD

1、定义一个BeanUtil.

package cn.doitedu.dw.etl import org.apache.spark.sql.Row case class AppActionBean( val account: String, val appid: String, val appversion: String, val carrier: String, val deviceid: String, val devicetype: String, val eventid: String, val ip: String, val latitude: Double, val longitude: Double, val nettype: String, val osname: String, val osversion: String, val releasechannel: String, val resolution: String, val sessionid: String, val timestamp: Long, val properties: Map[String, String], var country: String = null, var province: String = null, var city: String = null, var region: String = null ) object BeanUtil { def rowToBean(row: Row): AppActionBean = { val account = row.getAs[String]("account") val appid = row.getAs[String]("appid") val appversion = row.getAs[String]("appversion") val carrier = row.getAs[String]("carrier") val deviceid = row.getAs[String]("deviceid") val devicetype = row.getAs[String]("devicetype") val eventid = row.getAs[String]("eventid") val ip = row.getAs[String]("ip") val latitude = row.getAs[Double]("latitude") val longitude = row.getAs[Double]("longitude") val nettype = row.getAs[String]("nettype") val osname = row.getAs[String]("osname") val osversion = row.getAs[String]("osversion") val releasechannel = row.getAs[String]("releasechannel") val resolution = row.getAs[String]("resolution") val sessionid = row.getAs[String]("sessionid") val timestamp = row.getAs[Long]("timestamp") val properties = row.getAs[Map[String, String]]("properties") AppActionBean( account, appid, appversion, carrier, deviceid, devicetype, eventid, ip, latitude, longitude, nettype, osname, osversion, releasechannel, resolution, sessionid, timestamp, properties ) } }

2、AppActionLogOdsToDwd

package cn.doitedu.dw.etl import java.io.{File, FileInputStream} import ch.hsr.geohash.GeoHash import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, LocatedFileStatus, Path, RemoteIterator} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.expressions.Window import org.lionsoul.ip2region.{DbConfig, DbSearcher} /** * app行为日志,ods层加工到dwd层 * 主要需求: * 1. 清洗过滤 * 2. json打平(properties事件属性字段不打平,作为一个hashmap类型整体存储) * 3. 地理位置集成(先用gps匹配,如果没有gps,用ip去匹配) * 4. 每条数据标记一个guid * 5. 数据规范化 * 6. 新老访客标记 */ object AppActionLogOdsToDwd { def main(args: Array[String]): Unit = { if(args.length { StringUtils.isBlank(s) } spark.udf.register("isempty", isempty) val washed = df // 过滤掉日志中account及deviceid全为空的记录 .where("!(isempty(deviceid) and isempty(account))") //(properties/eventid/sessionid 缺任何一个都不行) .where("properties is not null and !isempty(eventid) and !isempty(sessionid)") // 有数据延迟到达 .where(s"from_unixtime(cast(timestamp/1000 as bigint),'yyyy-MM-dd')='${args(1)}'") // TODO SESSION分割 // TODO 数据规范处理 // 加载geohash码地理位置知识库 val geoDictDF = spark.read.parquet("/dicts/geodicts") // 整理字典库为kv结构,并收集为单机hashmap集合 val geodictMap = geoDictDF.rdd.map(row => { val geohash = row.getAs[String]("geohash") val province = row.getAs[String]("province") val city = row.getAs[String]("city") val region = row.getAs[String]("region") (geohash, (province, city, region)) }).collectAsMap() // 广播 val bc1 = spark.sparkContext.broadcast(geodictMap) // 加载ip地址知识库(读本地磁盘方式) /*val dbFile: File = new File("dataware/data/ip2region/ip2region.db") val in: FileInputStream = new FileInputStream(dbFile) val dbBin: Array[Byte] = new Array[Byte](dbFile.length().toInt) in.read(dbBin)*/ // 加载ip地址知识库(读HDFS方式) val conf = new Configuration() conf.set("fs.defaultFS","hdfs://doitedu01:8020/") val fs = FileSystem.get(conf) val in = fs.open(new Path("/dicts/ip2region/ip2region.db")) val dbFile: FileStatus = fs.listStatus(new Path("/dicts/ip2region/ip2region.db"))(0) val dbBin: Array[Byte] = new Array[Byte](dbFile.getLen.toInt) in.readFully(0L,dbBin) val bc2 = spark.sparkContext.broadcast(dbBin) // 地理位置维度集成 val integrated = washed.rdd.map(row => { // 从广播变量中获取geo地理位置字典 val geoDict = bc1.value // 从广播变量中获取ip地址知识库(字节数组) val ipDictBytes = bc2.value val ipSearcher = new DbSearcher(new DbConfig(), ipDictBytes) val bean = BeanUtil.rowToBean(row) var country: String = "中国" var province: String = null var city: String = null var region: String = null // 根据经纬度,查询省市区 try { val geoCode = GeoHash.geoHashStringWithCharacterPrecision(bean.latitude, bean.longitude, 5) val area: (String, String, String) = geoDict.getOrElse(geoCode, (null, null, null)) province = area._1 city = area._2 region = area._3 } catch { case e: Exception => e.printStackTrace() } // 如果没有经纬度,根据IP地址查询省市区 if (province == null) { val block = ipSearcher.memorySearch(bean.ip).toString val split = block.split("\\|") // 2163|中国|华南|广东省|深圳市|鹏博士 if (split.size > 4) { country = split(1) province = split(3) city = split(4) } } if (country.equals("0")) { bean.country = "内网国" bean.province = "内网省" bean.city = "内网市" bean.region = "内网区" } else { bean.country = country bean.province = if (province != null && province.equals("0")) null else province bean.city = if (city != null && city.equals("0")) null else city bean.region = if (region != null && region.equals("0")) null else region } bean }).toDF() // ID_MAPPING // 加载设备id账号绑定评分表 : 此时,一个设备id可能有多条用户账号的绑定得分 val bindScore = spark.read.table("dwd17.id_account_bind").where(s"dt='${args(1)}'") // 加工成:一个设备id对应一个权重最高的guid // t.deviceid | t.account | t.first_login_ts | t.bind_score | import org.apache.spark.sql.functions._ val wd = Window.partitionBy('deviceid).orderBy('bind_score desc, 'first_login_ts asc) val bindDictDF = bindScore .select('deviceid, 'account, row_number() over (wd) as "rn") .where("rn=1") .selectExpr("deviceid", "nvl(account,deviceid) as guid") // 将处理好的日志数据 和 设备账号绑定映射表 JOIN integrated.createTempView("log") bindDictDF.createTempView("bind") val guidDF = spark.sql( """ | |select |log.account , |log.appid , |log.appversion , |log.carrier , |log.deviceid , |log.devicetype , |log.eventid , |log.ip , |log.latitude , |log.longitude , |log.nettype , |log.osname , |log.osversion , |log.releasechannel , |log.resolution , |log.sessionid , |log.timestamp , |log.properties , |log.country , |log.province , |log.city , |log.region , |bind.guid |from log join bind on log.deviceid=bind.deviceid | |""".stripMargin) // 新老访客标记,先加载T-1日的绑定评分表 val ids: DataFrame = spark.read.table("dwd17.id_account_bind") .where(s"dt='${args(2)}'") .select(explode(array("deviceid", "account")) as "id") .distinct() // 将前面处理好的日志数据 JOIN IDS表 guidDF.createTempView("log_guid") ids.createTempView("ids") spark.sql( s""" |insert into dwd17.app_action_detail partition(dt='${args(1)}') |select | a.account , | a.appid , | a.appversion , | a.carrier , | a.deviceid , | a.devicetype , | a.eventid , | a.ip , | a.latitude , | a.longitude , | a.nettype , | a.osname , | a.osversion , | a.releasechannel , | a.resolution , | a.sessionid , | a.timestamp as ts , | a.properties , | a.country , | a.province , | a.city , | a.region , | a.guid , | if(b.id is not null,0,1) as isnew |from log_guid a left join ids b on a.guid = b.id | |""".stripMargin) spark.close() } }

4、新老访客标记

将 T-1 日的绑定评分表中所有的 account 和 deviceid 取出,并打散成一列,形成一个 id 表 然后将: 日志表 JOIN id 表 条件是: 日志表.guid = id 表.id

判断逻辑: 能 join 上的,就是老访客; join 不上的,就是新访客

其他方法:bitmap、hypeloglog、布隆过滤器可以实现map端判断该账号是否之前出现过



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3