图解大数据

您所在的位置:网站首页 使用spark对数据进行分析 图解大数据

图解大数据

2023-06-01 07:39| 来源: 网络整理| 查看: 265

作者:韩信子@ShowMeAI 教程地址:www.showmeai.tech/tutorials/8… 本文地址:www.showmeai.tech/article-det… 声明:版权所有,转载请联系平台与作者并注明出处 引言

电商与新零售是目前大数据与AI应用最广泛的场景之一,本案例以跨国在线零售业务为背景,讲解使用pyspark对HDFS存储的数据进行交易数据分析的过程,并且对分析结果使用echarts做了可视化呈现。

1.环境搭建

本次作业使用的环境和软件如下:

(1)Linux操作系统:Ubuntu 16.04 (2)Python:3.8 (3)Hadoop:3.1.3 (4)Spark:2.4.0 (5)Bottle:v0.13

Bottle是一个快速、简洁、轻量级的基于WSIG的微型Web框架,此框架除了Python的标准库外,不依赖任何其他模块。安装方法是,打开Linux终端,执行如下命令:

sudo apt-get install python3-pip pip3 install bottle 复制代码 2.数据预处理

本案例的数据集来自Kaggle平台,是跨国在线零售业务交易数据,大家可以通过如下的百度网盘地址下载。

数据集和源代码下载(百度网盘) 链接:pan.baidu.com/s/1zg2MoNNZ… 提取码:show

这是一家在英国注册的公司,主要销售礼品。数据集 E_Commerce_Data.csv 包含541909条记录,时间跨度为2010-12-01到2011-12-09,每个记录由8个属性组成,具体的含义如下表所示:

字段名称类型含义举例InvoiceNostring订单编号(退货订单以C开头)536365StockCodestring产品代码85123ADescriptionstring产品描述WHITE METAL LANTERNQuantityinteger购买数量(负数表示退货)6InvoiceDatestring订单日期和时间12/1/2010 8:26UnitPricedouble单价(英镑)3.39CustomerIDinteger客户编号17850Countrystring国家名称United Kingdom

我们先将数据集E_Commerce_Data.csv上传至hdfs上,命令如下:

hdfs dfs -put E_Commerce_Data.csv 复制代码

大家可以通过如下命令进入pyspark的交互式编程环境,或者在配置好pyspark的jupyter Notebook中,对数据进行初步探索和清洗:

cd /usr/local/spark #进入Spark安装目录 ./bin/pyspark 复制代码

(1)读取在HDFS上的文件,以csv的格式读取,得到DataFrame对象

df=spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('E_Commerce_Data.csv') 复制代码

(2)查看数据集的大小,输出541909,不包含标题行

df.count() 复制代码

(3)打印数据集的schema,查看字段及其类型信息。输出内容就是上文中的属性表

df.printSchema() 复制代码

(4)创建临时视图data

df.createOrReplaceTempView("data") 复制代码

(5)由于顾客编号CustomID和商品描述 Description 均存在部分缺失,所以进行数据清洗,过滤掉有缺失值的记录。特别地,由于 CustomID为integer 类型,所以该字段若为空,则在读取时被解析为0,故用 df[“CustomerID”]!=0 条件过滤。

clean=df.filter(df["CustomerID"]!=0).filter(df["Description"]!="") 复制代码

(6)查看清洗后的数据集的大小,输出406829。

clean.count() 复制代码

(7)将清洗后的文件以csv的格式,写入 E_Commerce_Data_Clean.csv 中(实际上这是目录名,真正的文件在该目录下,文件名类似于 part-00000,需要确保HDFS中不存在这个目录,否则写入时会报“already exists”错误)

clean.write.format("com.databricks.spark.csv").options(header='true',inferschema='true').save('E_Commerce_Data_Clean.csv') 复制代码 3.数据分析

数据集和源代码下载(百度网盘) 链接:pan.baidu.com/s/1zg2MoNNZ… 提取码:show

我们构建一个总体的分析脚本 sales_data_analysis.py ,先导入需要用到的python模块。

获取数据集与代码 → ShowMeAI的官方GitHub github.com/ShowMeAI-Hu… 运行代码段与学习 → 在线编程环境 blog.showmeai.tech/python3-com… # -*- coding: utf-8 -*- from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StringType, DoubleType, IntegerType, StructField, StructType import json import os 复制代码

接着初始化SparkSession对象。

sc = SparkContext('local', 'spark_project') sc.setLogLevel('WARN') spark = SparkSession.builder.getOrCreate() 复制代码

之后从HDFS中以csv的格式读取清洗后的数据目录 E_Commerce_Data_Clean.csv ,spark得到DataFrame对象,并创建临时视图data用于后续分析。

df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('E_Commerce_Data_Clean.csv') df.createOrReplaceTempView("data") 复制代码

为方便统计结果的可视化,将结果导出为json文件供web页面渲染。使用save方法导出数据:

def save(path, data): with open(path, 'w') as f: f.write(data) 复制代码 1)数据概览 (1)客户数最多的10个国家

每个客户由编号 CustomerID 唯一标识,所以客户的数量为 COUNT(DISTINCT CustomerID) ,再按照国家 Country 分组统计,根据客户数降序排序,筛选出10个客户数最多的国家。得到的countryCustomerDF为DataFrame 类型,执行 collect() 方法即可将结果以数组的格式返回。

def countryCustomer(): countryCustomerDF = spark.sql("SELECT Country,COUNT(DISTINCT CustomerID) AS countOfCustomer FROM data GROUP BY Country ORDER BY countOfCustomer DESC LIMIT 10") return countryCustomerDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[国家名称,客户数] 复制代码 (2)销量最高的10个国家

Quantity字段表示销量,因为退货的记录中此字段为负数,所以使用 SUM(Quantity) 即可统计出总销量,即使有退货的情况。再按照国家 Country 分组统计,根据销量降序排序,筛选出10个销量最高的国家。得到的 countryQuantityDF 为DataFrame类型,执行 collect() 方法即可将结果以数组的格式返回。

def countryQuantity(): countryQuantityDF = spark.sql("SELECT Country,SUM(Quantity) AS sumOfQuantity FROM data GROUP BY Country ORDER BY sumOfQuantity DESC LIMIT 10") return countryQuantityDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[国家名称,销量] 复制代码 (3)各个国家的总销售额分布情况

UnitPrice 字段表示单价,Quantity 字段表示销量,退货的记录中 Quantity 字段为负数,所以使用 SUM(UnitPrice*Quantity) 即可统计出总销售额,即使有退货的情况。再按照国家 Country 分组统计,计算出各个国家的总销售额。得到的 countrySumOfPriceDF 为 DataFrame 类型,执行 collect() 方法即可将结果以数组的格式返回。

def countrySumOfPrice(): countrySumOfPriceDF = spark.sql("SELECT Country,SUM(UnitPrice*Quantity) AS sumOfPrice FROM data GROUP BY Country") return countrySumOfPriceDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[国家名称,总销售额] 复制代码 (4)销量最高的10个商品

Quantity 字段表示销量,退货的记录中 Quantity 字段为负数,所以使用 SUM(Quantity) 即可统计出总销量,即使有退货的情况。再按照商品编码 StockCode 分组统计,计算出各个商品的销量。得到的 stockQuantityDF 为 DataFrame 类型,执行 collect() 方法即可将结果以数组的格式返回。

def stockQuantity(): stockQuantityDF = spark.sql("SELECT StockCode,SUM(Quantity) AS sumOfQuantity FROM data GROUP BY StockCode ORDER BY sumOfQuantity DESC LIMIT 10") return stockQuantityDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[商品编号,销量] 复制代码 (5)商品描述的热门关键词Top300

Description字段表示商品描述,由若干个单词组成,使用 LOWER(Description) 将单词统一转换为小写。此时的结果为 DataFrame 类型,转化为 rdd 后进行词频统计,再根据单词出现的次数进行降序排序,流程图如下:

得到的结果为RDD类型,为其制作表头 wordCountSchema,包含 word 和 count 属性,分别为 string 类型和 integer 类型。调用 createDataFrame() 方法将其转换为 DataFrame 类型的 wordCountDF,将word为空字符串的记录剔除掉,调用 take() 方法得到出现次数最多的300个关键 词,以数组的格式返回。

def wordCount(): wordCount = spark.sql("SELECT LOWER(Description) as description from data").rdd.flatMap(lambda line:line['description'].split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b).repartition(1).sortBy(lambda x:x[1],False) wordCountSchema = StructType([StructField("word", StringType(), True),StructField("count", IntegerType(), True)]) wordCountDF = spark.createDataFrame(wordCount, wordCountSchema) wordCountDF = wordCountDF.filter(wordCountDF["word"]!='') return wordCountDF.take(300) 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[关键词,次数] 复制代码 (6)退货订单数最多的10个国家

InvoiceNo 字段表示订单编号,所以订单总数为 COUNT(DISTINCT InvoiceNo),由于退货订单的编号的首个字母为C,例如C540250,所以利用 WHERE InvoiceNo LIKE ‘C%’ 子句即可筛选出退货的订单,再按照国家Country分组统计,根据退货订单数降序排序,筛选出10个退货订单数最多的国家。得到的 countryReturnInvoiceDF 为 DataFrame 类型,执行 collect() 方法即可将结果以数组的格式返回。

def countryReturnInvoice(): countryReturnInvoiceDF = spark.sql("SELECT Country,COUNT(DISTINCT InvoiceNo) AS countOfReturnInvoice FROM data WHERE InvoiceNo LIKE 'C%' GROUP BY Country ORDER BY countOfReturnInvoice DESC LIMIT 10") return countryReturnInvoiceDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[国家名称,退货订单数] 复制代码 2)关联分析 (7)月销售额随时间的变化趋势

统计月销售额需要3个字段的信息,分别为订单日期 InvoiceDate,销量Quantity和单价UnitPrice。由于InvoiceDate字段格式不容易处理,例如“8/5/2011 16:19”,所以需要对这个字段进行格式化操作。由于统计不涉及小时和分钟数,所以只截取年月日部分,并且当数值小于10时补前置0来统一格式,期望得到年、月、日3个独立字段。先实现 formatData() 方法,利用 rdd 对日期、销量和单价字段进行处理。

def formatData(): tradeRDD = df.select("InvoiceDate","Quantity","UnitPrice",).rdd result1 = tradeRDD.map(lambda line: (line['InvoiceDate'].split(" ")[0], line['Quantity'] , line['UnitPrice'])) result2 = result1.map(lambda line: (line[0].split("/"), line[1], line[2])) result3 = result2.map(lambda line: (line[0][2], line[0][0] if len(line[0][0])==2 else "0"+line[0][0], line[0][1] if len(line[0][1])==2 else "0"+line[0][1], line[1], line[2])) return result3 复制代码

流程图如下:

由于要统计的是月销售额的变化趋势,所以只需将日期转换为“2011-08”这样的格式即可。而销售额表示为单价乘以销量,需要注意的是,退货时的销量为负数,所以对结果求和可以表示销售额。RDD的转换流程如下:

得到的结果为RDD类型,为其制作表头 schema,包含 date 和 tradePrice 属性,分别为string类型和double类型。调用 createDataFrame() 方法将其转换为DataFrame类型的 tradePriceDF ,调用 collect() 方法将结果以数组的格式返回。

def tradePrice(): result3 = formatData() result4 = result3.map(lambda line:(line[0]+"-"+line[1],line[3]*line[4])) result5 = result4.reduceByKey(lambda a,b:a+b).sortByKey() schema = StructType([StructField("date", StringType(), True),StructField("tradePrice", DoubleType(), True)]) tradePriceDF = spark.createDataFrame(result5, schema) return tradePriceDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[日期,销售额] 复制代码 (8)日销量随时间的变化趋势

由于要统计的是日销量的变化趋势,所以只需将日期转换为“2011-08-05”这样的格式即可。先调用上例的 formatData() 方法对日期格式进行格式化。RDD的转换流程如下:

得到的结果为RDD类型,为其制作表头schema,包含date和saleQuantity属性,分别为string类型和integer类型。调用createDataFrame()方法将其转换为DataFrame类型的saleQuantityDF,调用collect() 方法将结果以数组的格式返回。

def saleQuantity(): result3 = formatData() result4 = result3.map(lambda line:(line[0]+"-"+line[1]+"-"+line[2],line[3])) result5 = result4.reduceByKey(lambda a,b:a+b).sortByKey() schema = StructType([StructField("date", StringType(), True),StructField("saleQuantity", IntegerType(), True)]) saleQuantityDF = spark.createDataFrame(result5, schema) return saleQuantityDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[日期,销量] 复制代码 (9)各国的购买订单量和退货订单量的关系

InvoiceNo 字段表示订单编号,退货订单的编号的首个字母为C,例如C540250。利用 COUNT(DISTINCT InvoiceNo) 子句统计订单总量,再分别用 WHERE InvoiceNo LIKE ‘C%’ 和 WHERE InvoiceNo NOT LIKE ‘C%’ 统计出退货订单量和购买订单量。接着按照国家 Country 分组统计,得到的returnDF和buyDF均为DataFrame类型,分别表示退货订单和购买订单,如下所示:

再对这两个DataFrame执行join操作,连接条件为国家Country相同,得到一个DataFrame。但是这个DataFrame中有4个属性,包含2个重复的国家Country属性和1个退货订单量和1个购买订单量,为减少冗余,对结果筛选3个字段形成buyReturnDF。如下所示:

最后执行collect()方法即可将结果以数组的格式返回。

def buyReturn(): returnDF = spark.sql("SELECT Country AS Country,COUNT(DISTINCT InvoiceNo) AS countOfReturn FROM data WHERE InvoiceNo LIKE 'C%' GROUP BY Country") buyDF = spark.sql("SELECT Country AS Country2,COUNT(DISTINCT InvoiceNo) AS countOfBuy FROM data WHERE InvoiceNo NOT LIKE 'C%' GROUP BY Country2") buyReturnDF = returnDF.join(buyDF, returnDF["Country"] == buyDF["Country2"], "left_outer") buyReturnDF = buyReturnDF.select(buyReturnDF["Country"],buyReturnDF["countOfBuy"],buyReturnDF["countOfReturn"]) return buyReturnDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[国家名称,购买订单数,退货订单数] 复制代码 (10)商品的平均单价与销量的关系

由于商品的单价UnitPrice是不断变化的,所以使用平均单价AVG(DISTINCT UnitPrice)来衡量一个商品。再利用SUM(Quantity)计算出销量,将结果按照商品的编号进行分组统计,执行collect()方法即可将结果以数组的格式返回。

def unitPriceSales(): unitPriceSalesDF = spark.sql("SELECT StockCode,AVG(DISTINCT UnitPrice) AS avgUnitPrice,SUM(Quantity) AS sumOfQuantity FROM data GROUP BY StockCode") return unitPriceSalesDF.collect() 复制代码

最后调用save方法就可以将结果导出至文件了,格式如下:

[商品编号,平均单价,销量] 复制代码 3)小结

在sales_data_analysis.py中添加main函数,将上面的分析过程整合起来方便进行调用,代码如下:

获取数据集与代码 → ShowMeAI的官方GitHub github.com/ShowMeAI-Hu… 运行代码段与学习 → 在线编程环境 blog.showmeai.tech/python3-com… if __name__ == "__main__": base = "static/" if not os.path.exists(base): os.mkdir(base) m = { "countryCustomer": { "method": countryCustomer, "path": "countryCustomer.json" }, "countryQuantity": { "method": countryQuantity, "path": "countryQuantity.json" }, "countrySumOfPrice": { "method": countrySumOfPrice, "path": "countrySumOfPrice.json" }, "stockQuantity": { "method": stockQuantity, "path": "stockQuantity.json" }, "wordCount": { "method": wordCount, "path": "wordCount.json" }, "countryReturnInvoice": { "method": countryReturnInvoice, "path": "countryReturnInvoice.json" }, "tradePrice": { "method": tradePrice, "path": "tradePrice.json" }, "saleQuantity": { "method": saleQuantity, "path": "saleQuantity.json" }, "buyReturn": { "method": buyReturn, "path": "buyReturn.json" }, "unitPriceSales": { "method": unitPriceSales, "path": "unitPriceSales.json" } } for k in m: p = m[k] f = p["method"] save(base + m[k]["path"], json.dumps(f())) print ("done -> " + k + " , save to -> " + base + m[k]["path"]) 复制代码

上面的代码将所有的函数整合在变量 m中,通过循环调用上述所有方法并导出json文件到当前路径的static目录下。 最后利用如下指令运行分析程序:

cd /usr/local/spark ./bin/spark-submit sales_data_analysis.py 复制代码 4.数据可视化

本项目可视化使用百度开源的免费数据展示框架Echarts。Echarts是一个纯Javascript的图表库,可以流畅地运行在PC和移动设备上,兼容当前绝大部分浏览器,底层依赖轻量级的Canvas类库ZRender,提供直观,生动,可交互,可高度个性化定制的数据可视化图表。

获取数据集与代码 → ShowMeAI的官方GitHub github.com/ShowMeAI-Hu… 运行代码段与学习 → 在线编程环境 blog.showmeai.tech/python3-com…

编写web.py程序,实现一个简单的web服务器,代码如下:

from bottle import route, run, static_file import json @route('/static/') def server_static(filename): return static_file(filename, root="./static") @route("/") def server_page(name): return static_file(name, root=".") @route("/") def index(): return static_file("index.html", root=".") run(host="0.0.0.0", port=8888) 复制代码

bottle服务器对接收到的请求进行路由,规则如下:

(1)访问/static/时,返回静态文件 (2)访问/.html时,返回网页文件 (3)访问/时,返回首页index.html

服务器的8888端口监听来自任意ip的请求(前提是请求方能访问到这台服务器)。

首页index.html的主要代码如下(由于篇幅较大,只截取主要的部分)

E-Commerce-Data 在线零售业务数据分析 /* 省略 */ /* 只展示第一个统计结果的代码,其余省略 */ (1) 客户数最多的10个国家 ——英国的客户最多,达到3950个,数量远大于其他国家;其次是德国、法国、西班牙等 document.body.clientHeight; 复制代码

图表页通过一个iframe嵌入到首页中。以第一个统计结果的网页countryCustomer.html为例,展示主要代码:

/* 省略 */ var myChart = echarts.init(document.getElementById('chart')); myChart.setOption( { color: ['#3398DB'], tooltip: { trigger: 'axis', axisPointer: { type: 'shadow' } }, grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, xAxis: [ { name: '国家', data: [], axisTick: { alignWithLabel: true }, axisLabel: { interval:0, rotate:40 } } ], yAxis: [ { name: '客户数', } ], series: [ { name: '客户数', type: 'bar', barWidth: '60%', data: [] } ] }); myChart.showLoading(); $.getJSON("/static/countryCustomer.json", data => { var names=[]; var nums=[]; data = data.map(v => ({ country: v[0], customer: parseInt(v[1]), })) for(var i=0;i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3