基于spark的用户画像项目实战

您所在的位置:网站首页 spark实战项目 基于spark的用户画像项目实战

基于spark的用户画像项目实战

2024-07-16 08:51| 来源: 网络整理| 查看: 265

转自千峰王溯老师

1、用户画像项目简介 1.1 什么是用户画像

所谓的用户画像就是给用户贴一些标签,通过标签说明用户是一个什么样的人。

具体来说,给用户贴一些标签之后,根据用户的目标、行为和观点的差异,将他们区分为不同的类型,然后从每种类型中抽取典型特征,赋予名字、照片、一些人口统计学要素、场景等描述,形成了一个人物原型。

过程就是:通过客户信息抽象为用户画像进而抽象出对客户的认知。

1.2 用户画像主要维度

人口属性:用户是谁(性别、年龄、职业等基本信息)

消费需求:消费习惯和消费偏好

购买能力:收入、购买力、购买频次、渠道

兴趣爱好:品牌偏好、个人兴趣

社交属性:用户活跃场景(社交媒体等)

1.3 用户画像的数据类型

数据有动态数据和静态数据,所谓的静态数据如性别和年龄等短期无法改变的数据;而动态数据就是如短期行为相关的数据,比如说今天我想买件裙子,明天我就去看裤子了,这种数据特征就是比较多变。

1.4 用户画像的用途

杀熟、推荐(非常多)【用户画像是推荐系统的重要数据源】、市场营销、客服

让用户和企业双赢。让用户快速找到想要的商品,让企业找到为产品买单的人。

(一)微观层面

在产品设计时,通过用户画像来描述用户的需求。

在数据应用上,可以用来推荐、搜索、风控

将定性分析和定量分析结合,进行数据化运营和用户分析

进行精准化营销

(二)宏观层面

确定发展的战略、战术方向

进行市场细分与用户分群,以市场为导向

(三)画像建模预测

进行人口属性细分:明确是谁,购买了什么,为什么

购买行为细分:提供市场机会、市场规模等关键信息

产品需求细分:提供更具差异化竞争力的产品规格和业务价值

兴趣态度细分:提供人群类别画像:渠道策略,定价策略,产品策略,品牌策略

1.5 用户画像的步骤

(一)确定画像的目标

在产品不同生命周期,或者不同使用途径,目标不同,对画像的需求也有所不同,所以进行画像之前需要明确目标是什么,需求是什么。

(二)确定所需用户画像的维度

根据目标确定用户画像所需要的维度,比如说想进行商品推荐,就需要能影响用户选择商品的因素作为画像维度。比如用户维度(用户的年龄、性别会影响用户的选择),资产维度(用户的收入等因素会影响用户对价格选择),行为维度(用户最近常看的应该是想买的)等等。

(三)确定画像的层级

用户画像层级越多,说明画像粒度越小,对用户的理解也越清晰。比如说用户维度,可以分为新用户和老用户,进而划分用户的性别、年龄等。这个需要根据目标需求进行划分。

(四)通过原始数据,采用机器学习算法为用户贴上标签

因为我们获得的原始数据是一些杂乱无章的数据,所以就需要算法通过某些特征为用户贴上标签

(五)通过机器学习算法将标签变为业务的输出

每个人会有很多很多的标签,需要进一步将这些标签转化为对用户的理解。需要对不同的标签建不同的权重,从而得出对业务的输出。比如说具有一些标签的用户会喜欢什么样的产品。

(六)业务产生数据,数据反哺业务,不断循环的闭环

1.6 常见的用户画像标签 2、系统架构 2.1 整体架构(线下项目) 2.2 数据处理流程(要做什么事)

ETL(Extract Tranform Load)用来描述数据从来源端,经过 抽取、转换、加载 到目的端的过程;

ODS(Operational Data Store)操作数据存储。此层数据无任何更改,直接沿用外围系统数据结构和数据,不对外开放;为临时存储层,是接口数据的临时存储区域,为后一步的数据处理做准备。

要实现的主要步骤:ETL、报表统计(数据分析)、生成商圈库、数据标签化(核心)

2.3 主要数据集(要分析的日志数据文件)说明 为整合后的日志数据,每天一份,json格式(离线处理) 这个数据集整合了内部、外部的数据,以及竞价信息(与广告相关) 数据的列非常多,接近百个 字段 解释 ip 设备的真实IP sessionid 会话标识 advertisersid 广告主ID adorderid 广告ID adcreativeid 广告创意ID( >= 200000 : dsp , < 200000 oss) adplatformproviderid 广告平台商ID(>= 100000: rtb , < 100000 : api ) sdkversionnumber SDK版本号 adplatformkey 平台商key putinmodeltype 针对广告主的投放模式,1:展示量投放 2:点击量投放 requestmode 数据请求方式(1:请求、2:展示、3:点击) adprice 广告价格 adppprice 平台商价格 requestdate 请求时间,格式为:yyyy-m-dd hh:mm:ss appid 应用id appname 应用名称 uuid 设备唯一标识,比如imei或者androidid等 device 设备型号,如htc、iphone client 设备类型 (1:android 2:ios 3:wp) osversion 设备操作系统版本,如4.0 density 备屏幕的密度 android的取值为0.75、1、1.5,ios的取值为:1、2 pw 设备屏幕宽度 ph 设备屏幕高度 provincename 设备所在省份名称 cityname 设备所在城市名称 ispid 运营商id ispname 运营商名称 networkmannerid 联网方式id networkmannername 联网方式名称 iseffective 有效标识(有效指可以正常计费的)(0:无效 1:有效) isbilling 是否收费(0:未收费 1:已收费) adspacetype 广告位类型(1:banner 2:插屏 3:全屏) adspacetypename 广告位类型名称(banner、插屏、全屏) devicetype 设备类型(1:手机 2:平板) processnode 流程节点(1:请求量kpi 2:有效请求 3:广告请求) apptype 应用类型id district 设备所在县名称 paymode 针对平台商的支付模式,1:展示量投放(CPM) 2:点击量投放(CPC) isbid 是否rtb bidprice rtb竞价价格 winprice rtb竞价成功价格 iswin 是否竞价成功 cur values:usd|rmb等 rate 汇率 cnywinprice rtb竞价成功转换成人民币的价格 imei 手机串码 mac 手机MAC码 idfa 手机APP的广告码 openudid 苹果设备的识别码 androidid 安卓设备的识别码 rtbprovince rtb 省 rtbcity rtb 市 rtbdistrict rtb 区 rtbstreet rtb 街道 storeurl app的市场下载地址 realip 真实ip isqualityapp 优选标识 bidfloor 底价 aw 广告位的宽 ah 广告位的高 imeimd5 imei_md5 macmd5 mac_md5 idfamd5 idfa_md5 openudidmd5 openudid_md5 androididmd5 androidid_md5 imeisha1 imei_sha1 macsha1 mac_sha1 idfasha1 idfa_sha1 openudidsha1 openudid_sha1 androididsha1 androidid_sha1 uuidunknow uuid_unknow UUID密文 userid 平台用户id iptype 表示ip库类型,1为点媒ip库,2为广告协会的ip地理信息标准库,默认为1 initbidprice 初始出价 adpayment 转换后的广告消费(保留小数点后6位) agentrate 代理商利润率 lomarkrate 代理利润率 adxrate 媒介利润率 title 标题 keywords 关键字 tagid 广告位标识(当视频流量时值为视频ID号) callbackdate 回调时间 格式为:YYYY/mm/dd hh:mm:ss channelid 频道ID mediatype 媒体类型 email 用户email tel 用户电话号码 sex 用户性别 age 用户年龄 3、创建工程【第二天重点】 3.1 步骤

新建一个Maven项目,用于处理数据

Maven 管理项目中用到的所有jar

修改 pom.xml 文件,增加:

定义依赖版本 导入依赖 定义配置文件

创建scala目录(src、test中分别创建)

在scala目录中创建包 cn.itbigdata.dmp

编写一个主程序的架构 (DMPApp)

增加配置文件 dev/application.conf

新增目录 utils,新增参数解析类 ConfigHolder

3.2 修改pom.xml文件【重点】

设置依赖版本信息

2.11.8 2.11 2.6.1 2.3.3 1.1.0 1.2.44 1.3.0 2.12.0 1.2.1

导入计算引擎的依赖

org.scala-lang scala-library ${scala.version} org.scala-lang scala-xml 2.11.0-M4 org.apache.hadoop hadoop-client ${hadoop.version} org.apache.spark spark-core_${scala.version.simple} ${spark.version} org.apache.spark spark-sql_${scala.version.simple} ${spark.version} org.apache.spark spark-graphx_${scala.version.simple} ${spark.version}

导入存储引擎的依赖(可省略)

org.apache.hive hive-jdbc org.apache.hive hive-service-rpc org.apache.hive hive-service ${hive.version}

导入工具依赖

com.maxmind.geoip geoip-api ${geoip.version} com.maxmind.geoip2 geoip2 ${geoip2.version} ch.hsr geohash ${geoip.version} org.json4s json4s-jackson_${scala.version.simple} 3.6.5 com.typesafe config ${config.version}

导入编译配置(可省略)

org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 maven-assembly-plugin package single true cn.itcast.dmp.processing.App jar-with-dependencies 3.3 创建scala代码包 1558338673059.png

在 src/main/ 下 创建scala代码包

在scala包中创建 cn.itbigdata.dmp 包

在 cn.itbigdata.dmp 包下创建

beans(存放类的定义) etl(etl相关处理) report(报表处理) tradingarea(商圈库) tags(标签处理) customtrait(存放接口定义) utils(存放工具类)

在整个工程中建立data目录,存放要处理的数据

3.4 DmpApp 主程序(初始化部分) import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession // 项目的主程序,在这里完成相关的任务 object DmpApp { def main(args: Array[String]): Unit = { // 初始化 val conf: SparkConf = new SparkConf() .setMaster("local") .setAppName("DmpApp") .set("spark.worker.timeout", "600s") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "5") .set("spark.speculation", "true") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.buffer.pageSize", "8m") .set("park.debug.maxToStringFields", "200") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // 关闭资源 spark.close() } } 3.5 spark相关参数解释 参数名 默认值 定义值 spark.worker.timeout 60 500 spark.network.timeout 120s 600s spark.rpc.askTimeout spark.network.timeout 600s spark.cores.max 10 spark.task.maxFailures 4 5 spark.speculation false true spark.driver.allowMultipleContexts false true spark.serializer org.apache.spark.serializer.JavaSerializer org.apache.spark.serializer.KryoSerializer spark.buffer.pageSize 1M - 64M,系统计算 8M spark.worker.timeout: 网络故障导致心跳长时间不上报给master,经过spark.worker.timeout(秒)时间后,master检测到worker异常,标识为DEAD状态,同时移除掉worker信息以及其上面的executor信息; spark.network.timeout:所有网络交互的默认超时。由网络或者 gc 引起,worker或executor没有接收到executor或task的心跳反馈。提高 spark.network.timeout 的值,根据情况改成300(5min)或更高; spark.rpc.askTimeout: rpc 调用的超时时间; spark.cores.max:每个应用程序所能申请的最大CPU核数; spark.task.maxFailures:当task执行失败时,并不会直接导致整个应用程序down掉,只有在重试了 spark.task.maxFailures 次后任然失败的情况下才会使程序down掉; spark.speculation:推测执行是指对于一个Stage里面运行慢的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例,从而加快运行速度; spark.driver.allowMultipleContexts: SparkContext默认只有一个实例,设置为true允许有多个实例; spark.serializer:在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作【发给Executor上的Task;需要缓存的RDD(前提是使用序列化方式缓存);广播变量;shuffle过程中的数据缓存等】;默认的Java序列化方式性能不高,同时序列化后占用的字节数也较多;官方也推荐使用Kryo的序列化库。官方文档介绍,Kryo序列化机制比Java序列化机制性能提高10倍左右; spark.buffer.pageSize: spark内存分配的单位,无默认值,大小在1M-64M之间,spark根据jvm堆内存大小计算得到;值过小,内存分配效率低;值过大,造成内存的浪费; 3.6 开发环境参数配置文件

application.conf

// 开发环境参数配置文件 # App 信息 spark.appname="dmpApp" # spark 信息 spark.master="local[*]" spark.worker.timeout="120" spark.cores.max="10" spark.rpc.askTimeout="600s" spark.network.timeout="600s" spark.task.maxFailures="5" spark.speculation="true" spark.driver.allowMultipleContexts="true" spark.serializer="org.apache.spark.serializer.KryoSerializer" spark.buffer.pageSize="8m" # kudu 信息 kudu.master="node1:7051,node2:7051,node3:7051" # 输入数据的信息 addata.path="data/dataset_main.json" ipdata.geo.path="data/dataset_geoLiteCity.dat" qqwrydat.path="data/dataset_qqwry.dat" installDir.path="data" # 对应ETL输出信息 ods.prefix="ods" ad.data.tablename="adinfo" # 输出报表对应:地域统计、广告地域、APP、设备、网络、运营商、渠道 7个分析 report.region.stat.tablename="RegionStatAnalysis" report.region.tablename="AdRegionAnalysis" report.app.tablename="AppAnalysis" report.device.tablename="DeviceAnalysis" report.network.tablename="NetworkAnalysis" report.isp.tablename="IspAnalysis" report.channel.tablename="ChannelAnalysis" # 高德API gaoDe.app.key="a94274923065a14222172c9b933f4a4e" gaoDe.url="https://restapi.amap.com/v3/geocode/regeo?" # GeoHash (key的长度) geohash.key.length=10 # 商圈库 trading.area.tablename="tradingArea" # tags non.empty.field="imei,mac,idfa,openudid,androidid,imeimd5,macmd5,idfamd5,openudidmd5,androididmd5,imeisha1,macsha1,idfasha1,openudidsha1,androididsha1" appname.dic.path="data/dic_app" device.dic.path="data/dic_device" tags.table.name.prefix="tags" # 标签衰减系数 tag.coeff="0.92" # es 相关参数 es.cluster.name="cluster_es" es.index.auto.create="true" es.Nodes="192.168.40.164" es.port="9200" es.index.reads.missing.as.empty="true" es.nodes.discovery="false" es.nodes.wan.only="true" es.http.timeout="2000000" 3.7 配置文件解析类 // 解析参数文件帮助类 import com.typesafe.config.ConfigFactory object ConfigHolder { private val config = ConfigFactory.load() // App Info lazy val sparkAppName: String = config.getString("spark.appname") // Spark parameters lazy val sparkMaster: String = config.getString("spark.master") lazy val sparkParameters: List[(String, String)] = List( ("spark.worker.timeout", config.getString("spark.worker.timeout")), ("spark.cores.max", config.getString("spark.cores.max")), ("spark.rpc.askTimeout", config.getString("spark.rpc.askTimeout")), ("spark.network.timeout", config.getString("spark.network.timeout")), ("spark.task.maxFailures", config.getString("spark.task.maxFailures")), ("spark.speculation", config.getString("spark.speculation")), ("spark.driver.allowMultipleContexts", config.getString("spark.driver.allowMultipleContexts")), ("spark.serializer", config.getString("spark.serializer")), ("spark.buffer.pageSize", config.getString("spark.buffer.pageSize")) ) // kudu parameters lazy val kuduMaster: String = config.getString("kudu.master") // input dataset lazy val adDataPath: String = config.getString("addata.path") lazy val ipsDataPath: String = config.getString("ipdata.geo.path") def ipToRegionFilePath: String = config.getString("qqwrydat.path") def installDir: String = config.getString("installDir.path") // output dataset private lazy val delimiter = "_" private lazy val odsPrefix: String = config.getString("ods.prefix") private lazy val adInfoTableName: String = config.getString("ad.data.tablename") // lazy val ADMainTableName = s"$odsPrefix$delimiter$adInfoTableName$delimiter${DateUtils.getTodayDate()}" // report lazy val Report1RegionStatTableName: String = config.getString("report.region.stat.tablename") lazy val ReportRegionTableName: String = config.getString("report.region.tablename") lazy val ReportAppTableName: String = config.getString("report.app.tablename") lazy val ReportDeviceTableName: String = config.getString("report.device.tablename") lazy val ReportNetworkTableName: String = config.getString("report.network.tablename") lazy val ReportIspTableName: String = config.getString("report.isp.tablename") lazy val ReportChannelTableName: String = config.getString("report.channel.tablename") // GaoDe API private lazy val gaoDeKey: String = config.getString("gaoDe.app.key") private lazy val gaoDeTempUrl: String = config.getString("gaoDe.url") lazy val gaoDeUrl: String = s"$gaoDeTempUrl&key=$gaoDeKey" // GeoHash lazy val keyLength: Int = config.getInt("geohash.key.length") // 商圈库 lazy val tradingAreaTableName: String =config.getString("trading.area.tablename") // tags lazy val idFields: String = config.getString("non.empty.field") lazy val filterSQL: String = idFields .split(",") .map(field => s"$field is not null ") .mkString(" or ") lazy val appNameDic: String = config.getString("appname.dic.path") lazy val deviceDic: String = config.getString("device.dic.path") lazy val tagsTableNamePrefix: String = config.getString("tags.table.name.prefix") + delimiter lazy val tagCoeff: Double = config.getDouble("tag.coeff") // 加载 elasticsearch 相关参数 lazy val ESSparkParameters = List( ("cluster.name", config.getString("es.cluster.name")), ("es.index.auto.create", config.getString("es.index.auto.create")), ("es.nodes", config.getString("es.Nodes")), ("es.port", config.getString("es.port")), ("es.index.reads.missing.as.empty", config.getString("es.index.reads.missing.as.empty")), ("es.nodes.discovery", config.getString("es.nodes.discovery")), ("es.nodes.wan.only", config.getString("es.nodes.wan.only")), ("es.http.timeout", config.getString("es.http.timeout")) ) def main(args: Array[String]): Unit = { println(ConfigHolder.sparkParameters) println(ConfigHolder.installDir) } } 3.8 DmpApp主程序(使用配置文件) import cn.itbigdata.dmp.utils.ConfigHolder import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DmpApp { def main(args: Array[String]): Unit = { // 1、初始化(SparkConf、SparkSession) val conf = new SparkConf() .setAppName(ConfigHolder.sparkAppName) .setMaster(ConfigHolder.sparkMaster) .setAll(ConfigHolder.sparkParameters) val spark: SparkSession = SparkSession.builder() .config(conf) .getOrCreate() spark.sparkContext.setLogLevel("warn") println("OK!") // 1、ETL // 2、报表 // 3、生成商圈库 // 4、标签化 // 关闭资源 spark.close() } } 4、ETL开发

需求:

将数据文件每一行中的 ip 地址,转换为经度、维度、省、市的信息;

ip => 经度、维度、省、市

保存转换后的数据文件(每天一个文件)

处理步骤:

读数据 数据处理 找出每一行数据中的ip地址 根据ip地址,算出对应的省、市、经度、纬度,添加到每行数据的尾部 保存数据 其他需求:数据每日加载一次,每天的数据单独存放在一个文件中

难点问题:处理数据(IP地址如何转化为省、市、经度、纬度)

4.1 搭建ETL架构

新建trait(Processor),为数据处理提供一个统一的接口类

import org.apache.spark.sql.SparkSession // 数据处理接口 // SparkSession 用于数据的加载和处理 // KuduContext 用于数据的保存 trait Processor { def process(spark: SparkSession) }

新建 ETLProcessor ,负责ETL处理

import cn.itbigdata.dmp.customtrait.Processor import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.sql.SparkSession object ETLProcessor extends Processor{ override def process(spark: SparkSession): Unit = { // 定义参数 val sourceDataFile = ConfigHolder.adLogPath val sinkDataPath = "" // 1 读数据 val sourceDF: DataFrame = spark.read.json(sourceDataFile) // 2 处理数据 // 2.1 找到ip // 2.2 将ip 转为 省、市、经度、维度 val rdd = sourceDF.rdd .map(row => { val ip: String = row.getAs[String]("ip") ip }) // 2.3 将省、市、经度、维度放在原数据的最后 // 3 保存数据 } } 4.2 IP地址转换为经纬度 使用GeoIP,将ip地址转为经纬度 GeoIP,是一套含IP数据库的软件工具 Geo根据来访者的IP, 定位该IP所在经纬度、国家/地区、省市、和街道等位置信息 GeoIP有两个版本,一个免费版,一个收费版本 收费版本的准确率高一些,更新频率也更频繁 因为GeoIP读取的是本地的二进制IP数据库,所以效率很高 4.3 IP地址转换为省市 纯真数据库,将ip转为省、市 纯真数据库收集了包括中国电信、中国移动、中国联通、长城宽带、聚友宽带等 ISP 的 IP 地址数据 纯真数据库是二进制文件,有开源的java代码,简单的修改,调用就可以了 case class Location(ip: String, region: String, city: String, longitude: Float, latitude: Float) private def ipToLocation(ip: String): Location ={ // 1 获取service val service = new LookupService("data/geoLiteCity.dat") // 2 获取Location val longAndLatLocation = service.getLocation(ip) // 3 获取经度、维度 val longitude = longAndLatLocation.longitude val latitude = longAndLatLocation.latitude // 4 利用纯真数据库获取省市 val ipService = new IPAddressUtils val regeinLocation: IPLocation = ipService.getregion(ip) val region = regeinLocation.getRegion val city = regeinLocation.getCity Location(ip, region, city, longitude, latitude) }

需要实现帮助类:

计算当天日期

import java.util.{Calendar, Date} import org.apache.commons.lang.time.FastDateFormat object DateUtils { def getToday: String = { val now = new Date FastDateFormat.getInstance("yyyyMMdd").format(now) } def getYesterday: String = { val calendar: Calendar = Calendar.getInstance calendar.set(Calendar.HOUR_OF_DAY, -24) FastDateFormat.getInstance("yyyyMMdd").format(calendar.getTime()) } def main(args: Array[String]): Unit = { println(getToday) println(getYesterday) } } 4.4 ETL完整实现 import java.util.Calendar import cn.itbigdata.dmp.customtrait.Processor import cn.itbigdata.dmp.util.iplocation.{IPAddressUtils, IPLocation} import com.maxmind.geoip.LookupService import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object ETLProcessor extends Processor{ // 定义参数 private val sourceDataFile: String = "data/data.json" private val sinkDataPath: String = s"outputdata/maindata.${getYesterday}" private val geoFilePath: String = "data/geoLiteCity.dat" override def process(spark: SparkSession): Unit = { // 1 读数据 val sourceDF: DataFrame = spark.read.json("data/data.json") // 2 处理数据 // 2.1 找到ip // 2.2 将ip 转为 省、市、经度、维度 import spark.implicits._ val ipDF: DataFrame = sourceDF.rdd .map { row => val ip = row.getAs[String]("ip") // 将ip转换为 省、市、经度、纬度 ip2Location(ip) }.toDF // 2.2 ipDF 与 sourceDF 做join,给每一行增加省、市、经纬度 val sinkDF: DataFrame = sourceDF.join(ipDF, Seq("ip"), "inner") // 3 保存数据 sinkDF.write.mode(SaveMode.Overwrite).json(sinkDataPath) } case class Location(ip: String, region: String, city: String, longitude: Float, latitude: Float) private def ip2Location(ip: String): Location ={ // 1 获取service val service = new LookupService(geoFilePath) // 2 获取Location val longAndLatLocation = service.getLocation(ip) // 3 获取经度、维度 val longitude = longAndLatLocation.longitude val latitude = longAndLatLocation.latitude // 4 利用纯真数据库获取省市 val ipService = new IPAddressUtils val regionLocation: IPLocation = ipService.getregion(ip) val region = regionLocation.getRegion val city = regionLocation.getCity Location(ip, region, city, longitude, latitude) } private def getYesterday: String = { val calendar: Calendar = Calendar.getInstance calendar.set(Calendar.HOUR_OF_DAY, -24) FastDateFormat.getInstance("yyyyMMdd").format(calendar.getTime()) } } 5、报表开发(数据分析--SparkSQL)

需要处理的报表

统计各地域的数量分布情况(RegionStatProcessor) 广告投放的地域分布情况统计(RegionAnalysisProcessor) APP分布情况统计(AppAnalysisProcessor) 手机设备类型分布情况统计(DeviceAnalysisProcessor) 网络类型分布情况统计(NetworkAnalysisProcessor) 网络运营商分布情况统计(IspAnalysisProcessor) 渠道分布情况统计(ChannelAnalysisProcessor) 5.1 数据地域分布 报表处理的步骤 了解业务需求:根据省、市分组,求数据量的分布情况 源数据:为每天的日志数据,即ETL的结果数据; 目标数据:保存在本地文件中,每个报表对应目录; 编写SQL,并测试 代码实现 定义 RegionStatProcessor 继承自Processor,实现process方法。具体实现步骤如下: import cn.itbigdata.dmp.customtrait.Processor import cn.itbigdata.dmp.utils.{ConfigHolder, DateUtils} import org.apache.spark.sql.{SaveMode, SparkSession} object RegionStatProcessor extends Processor{ override def process(spark: SparkSession): Unit = { // 定义参数 val sourceDataPath = s"outputdata/maindata-${DateUtils.getYesterday}" val sinkDataPath = "output/regionstat" // 读文件 val sourceDF = spark.read.json(sourceDataPath) sourceDF.createOrReplaceTempView("adinfo") // 处理数据 val RegionSQL1 = """ |select to_date(now()) as statdate, region, city, count(*) as infocount | from adinfo |group by region, city |""".stripMargin val sinkDF = spark.sql(RegionSQL1) sinkDF.show() // 写文件 sinkDF.coalesce(1).write.mode(SaveMode.Append).json(sinkDataPath) } } 5.2 广告投放地域分布

按照需求,完成以下模式的报表

备注:要求3个率:竞价成功率、广告点击率,媒体点击率

指标计算逻辑 指标 说明 adplatformproviderid requestmode processnode iseffective isbilling isbid iswin adorderid adcreativeid 原始请求 发来的所有原始请求数 1 >=1 有效请求 满足有效体检的数量 1 >=2 广告请求 满足广告请求的请求数量 1 3 参与竞价数 参与竞价的次数 >=100000 1 1 1 !=0 竞价成功数 成功竞价的次数 >=100000 1 1 1 (广告主)展示数 针对广告主统计:广告最终在终端被展示的数量 2 1 (广告主)点击数 针对广告主统计:广告被展示后,实际被点击的数量 3 1 (媒介)展示数 针对媒介统计:广告在终端被展示的数量 2 1 1 (媒介)点击数 针对媒介统计:展示的广告实际被点击的数量 3 1 1 DSP广告消费 winprice/1000 >=100000 1 1 1 >200000 >200000 DSP广告成本 Adptment/1000 >=100000 1 1 1 >200000 >200000

DSP广告消费 = DSP的RTB的钱

DSP广告成本 = 广告主付给DSP的钱

DSP的盈利 = DSP广告成本 - DSP广告消费

备注:对应字段:

OriginalRequest、ValidRequest、adRequest bidsNum、bidsSus、bidRate adDisplayNum、adClickNum、adClickRate MediumDisplayNum、MediumClickNum、MediumClickRate adconsume、adcost

代码实现 import cn.itbigdata.dmp.customtrait.Processor import cn.itbigdata.dmp.utils.DateUtils import org.apache.spark.sql.{SaveMode, SparkSession} object RegionAnalysisProcessor extends Processor{ override def process(spark: SparkSession): Unit = { // 定义参数 val sourceDataPath = s"outputdata/maindata-${DateUtils.getYesterday}" val sinkDataPath = "outputdata/regionanalysis" // 读文件 val sourceDF = spark.read.json(sourceDataPath) sourceDF.createOrReplaceTempView("adinfo") // 处理数据 val RegionSQL1 = """ |select to_date(now()) statdate, region, city, | sum(case when requestmode=1 and processnode>=1 then 1 else 0 end) as OriginalRequest, | sum(case when requestmode=1 and processnode>=2 then 1 else 0 end) as ValidRequest, | sum(case when requestmode=1 and processnode=3 then 1 else 0 end) as adRequest, | sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and isbid=1 and adorderid!=0 | then 1 else 0 end) as bidsNum, | sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 | then 1 else 0 end) as bidsSus, | sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) as adDisplayNum, | sum(case when requestmode=3 and iseffective=1 then 1 else 0 end) as adClickNum, | sum(case when requestmode=2 and iseffective=1 and isbilling=1 then 1 else 0 end) as MediumDisplayNum, | sum(case when requestmode=3 and iseffective=1 and isbilling=1 then 1 else 0 end) as MediumClickNum, | sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 | and iswin=1 and adorderid>200000 and adcreativeid>200000 | then winprice/1000 else 0 end) as adconsume, | sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 | and iswin=1 and adorderid>200000 and adcreativeid>200000 | then adpayment/1000 else 0 end) as adcost | from adinfo |group by region, city | """.stripMargin spark.sql(RegionSQL1).createOrReplaceTempView("tabtemp") val RegionSQL2 = """ |select statdate, region, city, | OriginalRequest, ValidRequest, adRequest, | bidsNum, bidsSus, bidsSus/bidsNum as bidRate, | adDisplayNum, adClickNum, adClickNum/adDisplayNum as adClickRate, | MediumDisplayNum, MediumClickNum, MediumClickNum/MediumDisplayNum as mediumClickRate, | adconsume, adcost | from tabtemp """.stripMargin val sinkDF = spark.sql(RegionSQL2) // 写文件 sinkDF.coalesce(1).write.mode(SaveMode.Append).json(sinkDataPath) } } 6、数据标签化 6.1 什么是数据标签化

为什么要给数据打标签

分析数据的需求 用户对与数据搜索的需求,支持定向人群的条件筛选。如: 地域,甚至是商圈 性别 年龄 兴趣 设备

数据格式

目标数据:(用户id, 所有标签)。标签如下所示:

(CH@123485 -> 1.0, KW@word -> 1.0, CT@Beijing -> 1.0, GD@女 -> 1.0, AGE@40 -> 1.0, TA@北海 -> 1.0, TA@沙滩 -> 1.0)

Tag 数据组织形式Map[String, Double] 前缀+标签;1.0为权重

需要制作的标签

广告类型 渠道 App名称 性别 地理位置 设备 关键词 年龄 商圈(暂时不管)

日志数据的标签化

计算标签(广告类型、渠道、AppName、性别 ... ...) 提取用户标识 统一用户识别 标签数据落地 6.2 搭建框架 object TagProcessor extends Processor{ override def process(spark: SparkSession, kudu: KuduContext): Unit = { // 定义参数 val sourceTableName = ConfigHolder.ADMainTableName val sinkTableName = "" val keys = "" // 1 读数据 val sourceDF = spark.read .option("kudu.master", ConfigHolder.kuduMaster) .option("kudu.table", sourceTableName) .kudu // 2 处理数据 sourceDF.rdd .map(row => { // 广告类型、渠道、App名称 val adTags = AdTypeTag.make(row) // 性别、地理位置、设备 // 关键词、年龄、商圈 }) // 3 保存数据 } }

定义接口类:

import org.apache.spark.sql.Row trait TagMaker { def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] } 6.3 打标签 6.3.1 广告类型(AdTypeTag)

字段意义 1:banner; 2:插屏; 3:全屏

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.spark.sql.Row object AdTypeTag extends TagMaker{ private val adPrefix = "adtype@" override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = { // 1 取值 val adType: Long = row.getAs[Long]("adspacetype") // 1:banner; 2:插屏; 3:全屏 // 2 计算标签 adType match { case 1 => Map(s"${adPrefix}banner" -> 1.0) case 2 => Map(s"${adPrefix}插屏" -> 1.0) case 3 => Map(s"${adPrefix}全屏" -> 1.0) case _ => Map[String, Double]() } } } 6.3.2 渠道(ChannelTag)

字段:channelid

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object ChannelTag extends TagMaker{ private val channelPrefix = "channel@" override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = { // 1 取值 val channelid = row.getAs[String]("channelid") // 2 计算标签 if (StringUtils.isNotBlank(channelid)){ Map(s"${channelPrefix}channelid" -> 1.0) } else Map[String, Double]() } def main(args: Array[String]): Unit = { // 判断某字符串是否不为空,且长度不为0,且不由空白符(空格)构成 if (!StringUtils.isNotBlank(null)) println("blank1 !") if (!StringUtils.isNotBlank("")) println("blank2 !") if (!StringUtils.isNotBlank(" ")) println("blank3 !") } } 6.3.3 App名称(AppNameTag)

字段:appid;要将 appid 转为 appname

查给定的字典表:dicapp

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object AppNameTag extends TagMaker{ // 获取前缀 private val appNamePrefix = "appname@" override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = { // 1 获取地段信息 val appId = row.getAs[String]("appid") // 2 计算并返回标签 val appName = dic.getOrElse(appId, "") if (StringUtils.isNotBlank(appName)) Map(s"${appNamePrefix}$appName" -> 1.0) else Map[String, Double]() } } 6.3.4 性别(SexTag)

字段:sex;

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object SexTag extends TagMaker{ private val sexPrefix: String = "sex@" override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = { // 获取标签信息 val sexid: String = row.getAs[String]("sex") val sex = sexid match { case "1" => "男" case "2" => "女" case _ => "待填写" } // 计算返回标签 if (StringUtils.isNotBlank(sex)) Map(s"$sexPrefix$sex" -> 1.0) else Map[String, Double]() } } 6.3.5 地理位置(GeoTag)

字段:region、city

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object GeoTag extends TagMaker{ private val regionPrefix = "region@" private val cityPrefix = "city@" override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = { // 获取标签信息 val region = row.getAs[String]("region") val city = row.getAs[String]("city") // 计算并返回标签信息 val regionTag = if (StringUtils.isNotBlank(region)) Map(s"$regionPrefix$region" -> 1.0) else Map[String, Double]() val cityTag = if (StringUtils.isNotBlank(city)) Map(s"$cityPrefix$city" -> 1.0) else Map[String, Double]() regionTag ++ cityTag } } 6.3.6 设备(DeviceTag)

字段:client、networkmannername、ispname;

数据字典:dicdevice

client:设备类型 (1:android 2:ios 3:wp 4:others) networkmannername:联网方式名称(2G、3G、4G、其他) ispname:运营商名称(电信、移动、联通...) import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object DeviceTag extends TagMaker{ val clientPrefix = "client@" val networkPrefix = "network@" val ispPrefix = "isp@" override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = { // 获取标签信息 val clientName: String = row.getAs[Long]("client").toString val networkName: String = row.getAs[Long]("networkmannername").toString val ispName: String = row.getAs[Long]("ispname").toString // 计算并返回标签 val clientId = dic.getOrElse(clientName, "D00010004") val networkId = dic.getOrElse(networkName, "D00020005") val ispId = dic.getOrElse(ispName, "D00030004") val clientTag = if (StringUtils.isNotBlank(clientId)) Map(s"$clientPrefix$clientId" -> 1.0) else Map[String, Double]() val networkTag = if (StringUtils.isNotBlank(networkId)) Map(s"$networkPrefix$networkId" -> 1.0) else Map[String, Double]() val ispTag = if (StringUtils.isNotBlank(ispId)) Map(s"$ispPrefix$ispId" -> 1.0) else Map[String, Double]() clientTag ++ networkTag ++ ispTag } } 6.3.7 关键词(KeywordTag)

字段:keywords

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object KeywordsTag extends TagMaker{ private val keywordPrefix = "keyword@" override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = { row.getAs[String]("keywords") .split(",") .filter(word => StringUtils.isNotBlank(word)) .map(word => s"$keywordPrefix$word" -> 1.0) .toMap } } 6.3.8 年龄(AgeTag)

字段:age

import cn.itbigdata.dmp.customtrait.TagMaker import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.Row object AgeTag extends TagMaker{ private val agePrefix = "age@" override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = { val age = row.getAs[String]("age") if (StringUtils.isNotBlank(age)) Map(s"$agePrefix$age" -> 1.0) else Map[String, Double]() } } 6.3.9 主处理程序(TagProcessor) import cn.itbigdata.dmp.customtrait.Processor import cn.itbigdata.dmp.utils.DateUtils import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession object TagProcessor extends Processor{ override def process(spark: SparkSession): Unit = { // 定义参数 val sourceTableName = s"outputdata/maindata-${DateUtils.getYesterday}" val appdicFilePath = "data/dicapp" val deviceFilePath = "data/dicdevice" val sinkTableName = "" // 1 读数据 val sourceDF = spark.read.json(sourceTableName) // 读app信息(文件),转换为广播变量(优化) val appdicMap = spark.sparkContext.textFile(appdicFilePath) .map(line => { val arr: Array[String] = line.split("##") (arr(0), arr(1)) }).collectAsMap() val appdicBC: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(appdicMap) // 读字典信息(文件),转换为广播变量(优化) val deviceMap = spark.sparkContext.textFile(deviceFilePath) .map(line => { val arr: Array[String] = line.split("##") (arr(0), arr(1)) }).collectAsMap() val deviceBC: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(deviceMap) // 2 处理数据 sourceDF.printSchema() sourceDF.rdd .map(row => { // 广告类型、渠道、App名称 val adTags: Map[String, Double] = AdTypeTag.make(row) val channelTags: Map[String, Double] = ChannelTag.make(row) val appNameTags: Map[String, Double] = AppNameTag.make(row, appdicBC.value) // 性别、地理位置、设备类型 val sexTags = SexTag.make(row) val geoTags = GeoTag.make(row) val deviceTags = DeviceTag.make(row, deviceBC.value) // 关键词、年龄 val keywordsTags = KeywordsTag.make(row) val ageTags = AgeTag.make(row) // 将所有数据组成一个大的 Map 返回 val tags = adTags ++ channelTags ++ appNameTags ++ sexTags ++ geoTags ++ deviceTags ++ keywordsTags ++ ageTags tags }).collect.foreach(println) // 3 保存数据 } } 6.4 提取用户标识

日志数据针对某个用户单次特定的浏览行为

一个用户一天可能存在多条数据

标签是针对人的

存在的问题:

在数据集中抽出人的概念、让一个人能对应一条数据 在日志信息中找不到可用的用户id,只能退而求其次,找设备的信息,用设备的信息标识用户: IMEI:国际移动设备识别码(International Mobile Equipment Identity,IMEI),即通常所说的手机序列号、手机“串号”,用于在移动电话网络中识别每一部独立的手机等移动通信设备,相当于移动电话的身份证。IMEI是写在主板上的,重装APP不会改变IMEI。Android 6.0以上系统需要用户授予read_phone_state权限,如果用户拒绝就无法获得; IDFA:于iOS 6 时面世,可以监控广告效果,同时保证用户设备不被APP追踪的折中方案。可能发生变化,如系统重置、在设置里还原广告标识符。用户可以在设置里打开“限制广告跟踪”; mac地址:硬件标识符,包括WiFi mac地址和蓝牙mac地址。iOS 7 之后被禁止; OpenUDID:在iOS 5发布时,UDID被弃用了,这引起了广大开发者需要寻找一个可以替代UDID,并且不受苹果控制的方案。由此OpenUDID成为了当时使用最广泛的开源UDID替代方案。OpenUDID在工程中实现起来非常简单,并且还支持一系列的广告提供商; Android ID:在设备首次启动时,系统会随机生成一个64位的数字,并把这个数字以16进制字符串的形式保存下来,这个16进制的字符串就是ANDROID_ID,当设备被wipe后该值会被重置; 日志数据中可用于标识用户的字段包括: imei、mac、idfa、openudid、androidid imeimd5、macmd5、idfamd5、openudidmd5、androididmd5 imeisha1、macsha1、idfasha1、openudidsha1、androididsha1 什么是无效数据:以上15个字段全部为空,那么这条数据不能与任何用户发生关联,这条数据对我们来说没有任何用处,它是无效数据。这些数据需要除去。 // 15个字段同时为空时需要过滤 lazy val filterSQL: String = idFields .split(",") .map(field => s"$field != ''") .mkString(" or ") // 抽取用户标识 val userIds = getUserIds(row) // 返回标签 (userIds.head, (userIds, tags)) // 提取用户标识 private def getUserIds(row: Row): List[String] = { val userIds: List[String] = idFields.split(",") .map(field => (field, row.getAs[String](field))) .filter { case (key, value) => StringUtils.isNotBlank(value) } .map { case (key, value) => s"$key::$value" }.toList userIds } 6.5 用户识别 使用十五个字段(非空)联合标识用户 数据采集过程中: 每次采集的数据可能是不同的字段 某些字段还可能发生变化 如何识别相同用户的数据? 6.6 用户识别&数据聚合与合并 // 统一用户识别;数据聚合与合并 private def graphxAnalysis(rdd: RDD[(List[String], List[(String, Double)])]): RDD[(List[String], List[(String, Double)])] ={ // 1 定义顶点(数据结构:Long, ""; 算法:List中每个元素都可作为顶点,List本身也可作为顶点) val dotsRDD: RDD[(String, List[String])] = rdd.flatMap{ case (lst1, _) => lst1.map(elem => (elem, lst1)) } val vertexes: RDD[(Long, String)] = dotsRDD.map { case (id, ids) => (id.hashCode.toLong, "") } // 2 定义边(数据结构: Edge(Long, Long, 0)) val edges: RDD[Edge[Int]] = dotsRDD.map { case (id, ids) => Edge(id.hashCode.toLong, ids.mkString.hashCode.toLong, 0) } // 3 生成图 val graph = Graph(vertexes, edges) // 4 强连通图 val idRDD: VertexRDD[VertexId] = graph.connectedComponents() .vertices // 5 定义数据(ids与tags) val idsRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = rdd.map { case (ids, tags) => (ids.mkString.hashCode.toLong, (ids, tags)) } // 6 步骤4的结果 与 步骤5的结果 做join,将全部的数据做了分类【一个用户一个分类】 val joinRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = idRDD.join(idsRDD) .map { case (key, value) => value } // 7 数据的聚合(相同用户的数据放在一起) val aggRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = joinRDD.reduceByKey { case ((bufferIds, bufferTags), (ids, tags)) => (bufferIds ++ ids, bufferTags ++ tags) } // 8 数据的合并(对于id,去重;对tags,合并权重) val resultRDD: RDD[(List[String], List[(String, Double)])] = aggRDD.map { case (key, (ids, tags)) => val newTags = tags.groupBy(x => x._1) .mapValues(lst => lst.map { case (word, weight) => weight }.sum) .toList (ids.distinct, newTags) } resultRDD } 6.7 标签落地

数据保存到kudu中,请注意:

1、每天保存一张表(需要新建),表名:usertags_当天日期

2、数据类型转换 RDD [(List[String], List[(String, Double)])] => RDD[(String, String)] => DataFrame

将 List[String] 转为 String;分隔符的定义要注意 将 List[(String, Double)] 转为String,分隔符的定义要注意 分隔符:不能与数据中的符号重复;分隔符保证要能加上,还要能去掉。 // 3 数据落地(kudu) // 将List数据类型变为String import spark.implicits._ val resultDF = mergeRDD.map{ case (ids, tags) => (ids.mkString("||"), tags.map{case (key, value) => s"$key->$value"}.mkString("||")) }.toDF("ids", "tags") DBUtils.appendData(kudu, resultDF, sinkTableName, keys) } // 获取昨天日期 6.8 标签处理代码(TagProcessor) package cn.itcast.dmp.tags import cn.itcast.dmp.Processor import cn.itcast.dmp.utils.ConfigHolder import org.apache.commons.lang3.StringUtils import org.apache.kudu.spark.kudu.{KuduContext, KuduDataFrameReader} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} object TagProcessor extends Processor{ override def process(spark: SparkSession, kudu: KuduContext): Unit = { // 定义参数 val sourceTableName = ConfigHolder.ADMainTableName val sinkTableName = "" val keys = "" val dicAppPath = ConfigHolder.appNameDic val dicDevicePath = ConfigHolder.deviceDic val tradingAreaTableName = ConfigHolder.tradingAreaTableName val filterSQL = ConfigHolder.filterSQL val idFields = ConfigHolder.idFields // 1 读数据 val sc = spark.sparkContext val sourceDF = spark.read .option("kudu.master", ConfigHolder.kuduMaster) .option("kudu.table", sourceTableName) .kudu // 读app字典信息 val appDic = sc.textFile(dicAppPath) .map(line => { val arr = line.split("##") (arr(0), arr(1)) }) .collect() .toMap val appDicBC = sc.broadcast(appDic) // 读device字典信息 val deviceDic = sc.textFile(dicDevicePath) .map(line => { val arr = line.split("##") (arr(0), arr(1)) }) .collect() .toMap val deviceDicBC = sc.broadcast(deviceDic) // 读商圈信息(读;过滤;转为rdd;取数;收集数据到driver;转为map) // 限制条件:商圈表的信息不能过大(过滤后的大小小于20M为宜) val tradingAreaDic: Map[String, String] = spark.read .option("kudu.master", ConfigHolder.kuduMaster) .option("kudu.table", tradingAreaTableName) .kudu .filter("areas!=''") .rdd .map { case Row(geohash: String, areas: String) => (geohash, areas) } .collect() .toMap val tradingAreaBC = sc.broadcast(tradingAreaDic) // 2 处理数据 // 过滤15个标识字段都为空的数据 val userTagsRDD: RDD[(List[String], List[(String, Double)])] = sourceDF.filter(filterSQL) .rdd .map(row => { // 广告类型、渠道、App名称 val adTags = AdTypeTag.make(row) val channelTags = ChannelTag.make(row) val appNameTags = AppNameTag.make(row, appDicBC.value) // 性别、地理位置、设备 val sexTags = SexTag.make(row, appDicBC.value) val geoTags = GeoTag.make(row, appDicBC.value) val deviceTags = DeviceTag.make(row, deviceDicBC.value) // 关键词、年龄、商圈 val keywordTags = KeywordTag.make(row, appDicBC.value) val ageTags = AgeTag.make(row, appDicBC.value) val tradingAreaTags = tradingAreaTag.make(row, tradingAreaDic.value) // 标签合并 val tags = adTags ++ channelTags ++ appNameTags ++ sexTags ++ geoTags ++ deviceTags ++ keywordTags ++ ageTags ++ tradingAreaTags // 抽取用户标识 val userIds: List[String] = idFields.split(",") .map(field => (field, row.getAs[String](field))) .filter { case (key, value) => StringUtils.isNotBlank(value) } .map { case (key, value) => s"$key::$value" }.toList // 返回标签 (userIds, tags) }) userTagsRDD.foreach(println) // 3 统一用户识别,合并数据 val mergeRDD: RDD[(List[String], List[(String, Double)])] = graphxAnalysis(logTagsRDD) // 4 数据落地(kudu) // 将List数据类型变为String import spark.implicits._ val resultDF = mergeRDD.map{ case (ids, tags) => (ids.mkString("|||"), tags.map{case (key, value) => s"$key->$value"}.mkString("|||")) }.toDF("ids", "tags") DBUtils.createTableAndsaveData(kudu, resultDF, sinkTableName, keys) // 关闭资源 sc.stop() } } 7、Spark GraphX 7.1 图计算基本概念

图是用于表示对象之间模型关系的数学结构。图由顶点和连接顶点的边构成。顶点是对象,而边是对象之间的关系。

1555407826579.png

有向图是顶点之间的边是有方向的。有向图的例子如 Twitter 上的关注者。用户 Bob 关注了用户 Carol ,而 Carol 并没有关注 Bob。

1555407851059.png

就是图,通过点(对象)和边(路径),构成了不同对象之间的关系

7.2 图计算应用场景

1)最短路径

最短路径在社交网络里面,有一个六度空间的理论,表示你和任何一个陌生人之间所间隔的人不会超过五个,也就是说,最多通过五个中间人你就能够认识任何一个陌生人。这也是图算法的一种,也就是说,任何两个人之间的最短路径都是小于等于6。

2)社群发现

社群发现用来发现社交网络中三角形的个数(圈子),可以分析出哪些圈子更稳固,关系更紧密,用来衡量社群耦合关系的紧密程度。一个人的社交圈子里面,三角形个数越多,说明他的社交关系越稳固、紧密。像Facebook、Twitter等社交网站,常用到的的社交分析算法就是社群发现。

3)推荐算法(ALS)

推荐算法(ALS)ALS是一个矩阵分解算法,比如购物网站要给用户进行商品推荐,就需要知道哪些用户对哪些商品感兴趣,这时,可以通过ALS构建一个矩阵图,在这个矩阵图里,假如被用户购买过的商品是1,没有被用户购买过的是0,这时我们需要计算的就是有哪些0有可能会变成1

GraphX 通过弹性分布式属性图扩展了 Spark RDD。

通常,在图计算中,基本的数据结构表达是:

Graph = (Vertex,Edge) Vertex (顶点/节点) (VertexId: Long, info: Any) Edge (边)Edge(srcId: VertexId, dstId: VertexId, attr) 【attr 权重】 7.3 Spark GraphX例子一(强连通体) 1557105493255.png ID 关键词 AppName 1 卡罗拉 团车 2 印度尼西亚,巴厘岛 去哪儿旅游 3 善导大师 知乎 4 王的女人,美人无泪 优酷 5 世界杯 搜狐 6 刘嘉玲,港台娱乐 凤凰网 7 日韩娱乐 花椒直播 9 AK47 绝地求生:刺激战场 10 搞笑 YY直播 11 文学,时政 知乎 ID IDS 1 43125 2 43125 3 43125 4 43125 5 43125 4 4567 5 4567 6 4567 7 4567 9 91011 10 91011 11 91011

Connected Components算法(连通体算法):

1、定义顶点

2、定义边

3、生成图

4、用标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id

任务:

定义顶点 定义边 生成图 生成强连通图 import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object GraphXDemo1 { def main(args: Array[String]): Unit = { // 1、初始化sparkcontext val conf = new SparkConf() .setAppName("GraphXDemo1") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 2、定义顶点 (VertexId: Long, info: Any) val vertexes: RDD[(VertexId, Map[String, Double])] = sc.makeRDD(List( (1L, Map("keyword:卡罗拉" -> 1.0, "AppName:团车" -> 1.0)), (2L, Map("keyword:印度尼西亚" -> 1.0, "keyword:巴厘岛" -> 1.0, "AppName:去哪儿旅游" -> 1.0)), (3L, Map("keyword:善导大师" -> 1.0, "AppName:知乎" -> 1.0)), (4L, Map("keyword:王的女人" -> 1.0, "keyword:美人无泪" -> 1.0, "AppName:优酷" -> 1.0)), (5L, Map("keyword:世界杯" -> 1.0, "AppName:搜狐" -> 1.0)), (6L, Map("keyword:刘嘉玲" -> 1.0, "keyword:港台娱乐" -> 1.0, "AppName:凤凰网" -> 1.0)), (7L, Map("keyword:日韩娱乐" -> 1.0, "AppName:花椒直播" -> 1.0)), (9L, Map("keyword:AK47" -> 1.0, "AppName:绝地求生:刺激战场" -> 1.0)), (10L, Map("keyword:搞笑" -> 1.0, "AppName:YY直播" -> 1.0)), (11L, Map("keyword:文学" -> 1.0, "keyword:时政" -> 1.0, "AppName:知乎" -> 1.0)) )) // 3、定义边 Edge(srcId: VertexId, dstId: VertexId, attr) val edges: RDD[Edge[Int]] = sc.makeRDD(List( Edge(1L, 42125L, 0), Edge(2L, 42125L, 0), Edge(3L, 42125L, 0), Edge(4L, 42125L, 0), Edge(5L, 42125L, 0), Edge(4L, 4567L, 0), Edge(5L, 4567L, 0), Edge(6L, 4567L, 0), Edge(7L, 4567L, 0), Edge(9L, 91011, 0), Edge(10L, 91011, 0), Edge(11L, 91011, 0) )) // 4、生成图;生成强联通图 Graph(vertexes, edges) .connectedComponents() .vertices .sortBy(_._2) .collect() .foreach(println) // 5、资源释放 sc.stop() } } 7.4 Spark GraphX例子二(用户识别&数据合并)

根据前面的例子,我们已经知道根据规则如何识别用户,程序如何处理呢?

数据的定义:

备注:

1、这里定义的数据格式与我们程序中的数据格式完全一致

2、RDD中是一个元组,第一个元素代表用户的各种 id ;第二个元素代表用户的标签

任务:

1、6条数据代表多少个用户

2、合并相同用户的数据

val dataRDD: RDD[(List[String], List[(String, Double)])] = sc.makeRDD(List( (List("a1", "b1", "c1"), List("keyword$北京" -> 1.0, "keyword$上海" -> 1.0, "area$中关村" -> 1.0)), (List("b1", "c2", "d1"), List("keyword$上海" -> 1.0, "keyword$天津" -> 1.0, "area$回龙观" -> 1.0)), (List("d1"), List("keyword$天津" -> 1.0, "area$中关村" -> 1.0)), (List("a2", "b2", "c3"), List("keyword$大数据" -> 1.0, "keyword$spark" -> 1.0, "area$西二旗" -> 1.0)), (List("b2", "c4", "d4"), List("keyword$spark" -> 1.0, "area$五道口" -> 1.0)), (List("c3", "e3"), List("keyword$hive" -> 1.0, "keyword$spark" -> 1.0, "area$西二旗" -> 1.0)) ))

完整的处理步骤:

定义顶点 定义边 生成图 找强连通体 找需要合并的数据 数据聚合 数据合并

处理程序:

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object GraphXDemo { def main(args: Array[String]): Unit = { // 初始化 val conf: SparkConf = new SparkConf().setAppName("GraphXDemo").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("error") // 定义数据 val dataRDD: RDD[(List[String], List[(String, Double)])] = sc.makeRDD(List( (List("a1", "b1", "c1"), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中关村" -> 1.0)), (List("b1", "c2", "d1"), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龙观" -> 1.0)), (List("d1"), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)), (List("a2", "b2", "c3"), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)), (List("b2", "c4", "d4"), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)), (List("c3", "e3"), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)) )) val value: RDD[(String, List[String], List[(String, Double)])] = dataRDD.flatMap { case (allIds: List[String], tags: List[(String, Double)]) => { allIds.map { case elem: String => (elem, allIds, tags) } } } // 1 将标识信息中的每一个元素抽取出来,作为id // 备注1、这里使用了flatMap,将元素压平; // 备注2、这里丢掉了标签信息,因为这个RDD主要用于构造顶点、边,tags信息用不 // 备注3、顶点、边的数据要求Long,所以这里做了数据类型转换 val dotRDD: RDD[(VertexId, VertexId)] = dataRDD.flatMap { case (allids, tags) => // 方法一:好理解,不好写 // for (id (id.hashCode.toLong, allids.mkString.hashCode.toLong)) } // 2 定义顶点 val vertexesRDD: RDD[(VertexId, String)] = dotRDD.map { case (id, ids) => (id, "") } // 3 定义边(id: 单个的标识信息;ids: 全部的标识信息) val edgesRDD: RDD[Edge[Int]] = dotRDD.map { case (id, ids) => Edge(id, ids, 0) } // 4 生成图 val graph = Graph(vertexesRDD, edgesRDD) // 5 找到强连通体 val connectRDD: VertexRDD[VertexId] = graph.connectedComponents() .vertices // 6 定义中心点的数据 val centerVertexRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = dataRDD.map { case (allids, tags) => (allids.mkString.hashCode.toLong, (allids, tags)) } // 7 步骤5、6的数据做join,获取需要合并的数据 val allInfoRDD = connectRDD.join(centerVertexRDD) .map { case (id1, (id2, (allIds, tags))) => (id2, (allIds, tags)) } // 8 数据聚合(即将同一个用户的标识、标签放在一起) val mergeInfoRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = allInfoRDD.reduceByKey { case ((bufferList, bufferMap), (allIds, tags)) => val newList = bufferList ++ allIds // map 的合并 val newMap = bufferMap ++ tags (newList, newMap) } // 9 数据合并(allIds:去重;tags:合并权重) val resultRDD: RDD[(List[String], Map[String, Double])] = mergeInfoRDD.map { case (key, (allIds, tags)) => val newIds = allIds.distinct // 按照key做聚合;然后对聚合得到的lst第二个元素做累加 val newTags = tags.groupBy(x => x._1).mapValues(lst => lst.map(x => x._2).sum) (newIds, newTags) } resultRDD.foreach(println) sc.stop() } // def main(args: Array[String]): Unit = { // val lst = List( // ("kw$大数据",1.0), // ("kw$spark",1.0), // ("area$西二旗",1.0), // ("kw$spark",1.0), // ("area$五道口",1.0), // ("kw$hive",1.0), // ("kw$spark",1.0), // ("area$西二旗",1.0) // ) // // lst.groupBy(x=> x._1).map{case (key, value) => (key, value.map(x=>x._2).sum)}.foreach(println) // println("************************************************************") // // lst.groupBy(x=> x._1).mapValues(lst => lst.map(x=>x._2).sum).foreach(println) // println("************************************************************") } 8、项目总结


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3