spark期末大作业

您所在的位置:网站首页 python读取hdfs上的文件 spark期末大作业

spark期末大作业

2023-07-17 03:01| 来源: 网络整理| 查看: 265

一.需求描述

本次实验需要一台Linux系统的虚拟机,需要在Linux系统的虚拟机里运行Hadoop、Spark和Python,分别到Hadoop官网、Spark官网、下载Hadoop和Spark并且完成Hadoop及Spark搭建,建议更新apt,安装SSH、配置SSH无密码登录,安装Java环境,安装完Hadoop后需要进行单机配置、伪分布式配置和伪分布式,Hadoop 默认模式为非分布式模式(本地模式),无需进行其他配置即可运行。Hadoop 可以在单节点上以伪分布式的方式运行,Hadoop 进程以分离的 Java 进程来运行,节点既作为 NameNode 也作为 DataNode,同时,读取的是 HDFS 中的文件。非分布式即单 Java 进程,方便进行调试。上面的单机模式,grep 例子读取的是本地数据,伪分布式读取的则是 HDFS 上的数据。Hadoop的配置对实验很重要,然后使用依赖安装python3,可以把python3设置为默认路径,并且有pip3第三方库支持,因为要下载众多的第三方库。最后需要连接网络实现所需要分析的数据下载和上传。

二. 环境介绍

(1)Linux: Ubuntu 20

(2)Hadoop建议3.1.3版本,首先是创建hadoop用户,然后重启虚拟机用 创建的Hadoop用户登录,更新apt,安装SSH、配置SSH无密码登陆, 安装Java环境,然后才是安装 Hadoop3.1.3,安装完毕后进行Hadoop 单机配置、Hadoop伪分布式配置。

(3)Python: 3.8,Ubuntu如果没有联网的,只能下载好安装包上传后才做 的相应操作,要是可联网,直接apt-get install python3.6.即可。 Ubuntu20下默认系统自带Python3的版本,并没有Python2。

(4)Spark: 2.4.0 ,安装Spark之前需要先安装好Hadoop和JAVA JDK, 安装好Spark后需要配置Spark环境。Spark的安装过程较为简单,在 已安装好 Hadoop 的前提下,经过简单配置即可使用。

(5)java:jdk-8u162-linux-x64

(6)Jupyter Notebook

三.数据来源描述

2019新型冠状病毒,首先发现于中国武汉,初步的正式命名为2019-nCoV。感染者会出现程度不同的症状,最轻微的是无任何外在症状(被称为无症状感染者),有的只是发烧或轻微咳嗽,有的会发展为肺炎,严重者甚至死亡。本次作业使用的数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至今(2020-05-19)的相关数据。数据包含以下字段:字段名称、字段含义,如图。

四.数据上传及上传结果查看

这次的数据来源于网上,因此,可以在虚拟机里直接在网上下载数据,如果虚拟机无法联网可以先下载到主机然后开启共享文件夹以此把数据文件共享到虚拟机,如果数据文件保留在主机,而网路上无法再次找到数据文件的话可以使用邮箱形式自己给自己发送邮件,然后在虚拟机里打开邮箱获取。获取后放在用户自己的home文件夹里面。

五.数据处理过程描述

1.格式转换

原始数据集是以.csv文件组织的,为了方便spark读取生成RDD或者DataFrame,首先将us-counties.csv转换为.txt格式文件us-counties.txt。转换操作使用python实现,代码组织在toTxt.py中,具体代码如下:

hadoop@ubuntu:~/Desktop$ vim toTxt.py  #创建toTxt.py

2. 将文件上传至HDFS文件系统中

使用如下命令把本地文件系统的“/home/hadoop/us-counties.txt”上传到HDFS文件系统中,具体路径是“/user/hadoop/us-counties.txt”。具体命令如下:

 

3. 用Python作为编程语言编写完整代码

部分操作的完整实验代码存放在了analyst.py中,具体如下:

hadoop@ubuntu:/usr/local/hadoop$ vim analyst.py  #创建analyst.py

analyst.py代码如下:

from pyspark import SparkConf,SparkContext from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql import SparkSession from datetime import datetime import pyspark.sql.functions as func def toDate(inputStr):     newStr = ""     if len(inputStr) == 8:         s1 = inputStr[0:4]         s2 = inputStr[5:6]         s3 = inputStr[7]         newStr = s1+"-"+"0"+s2+"-"+"0"+s3     else:         s1 = inputStr[0:4]         s2 = inputStr[5:6]         s3 = inputStr[7:]         newStr = s1+"-"+"0"+s2+"-"+s3     date = datetime.strptime(newStr, "%Y-%m-%d")     return date #主程序: spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() fields=[StructField("date",DateType(),False),StructField("county",StringType(),False),StructFi     eld("state",StringType(),False),StructField("cases",IntegerType(),False),StructField("d eaths",IntegerType(),False),] schema = StructType(fields) rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt") rdd1 = rdd0.map(lambdax:x.split("\t")).map(lambdap:Row(toDate(p[0]),p[1],p[2],int(p[3]), int(p[4]))) shemaUsInfo = spark.createDataFrame(rdd1,schema) shemaUsInfo.createOrReplaceTempView("usInfo") #1.计算每日的累计确诊病例数和死亡数 df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc()) #列重命名 df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths") df1.repartition(1).write.json("result1.json")                               #写入hdfs #注册为临时表供下一步使用 df1.createOrReplaceTempView("ustotal") #2.计算每日较昨日的新增确诊病例数和死亡病例数 df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)") df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #写入hdfs #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数 df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state") df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs df3.createOrReplaceTempView("eachStateInfo") #4.找出美国确诊最多的10个州 df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10") df4.repartition(1).write.json("result4.json") #5.找出美国死亡最多的10个州 df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10") df5.repartition(1).write.json("result5.json") #6.找出美国确诊最少的10个州 df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10") df6.repartition(1).write.json("result6.json") #7.找出美国死亡最少的10个州 df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10") df7.repartition(1).write.json("result7.json") #8.统计截止5.19全美和各州的病死率 df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache() df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json") 4. 读取文件生成DataFrame 上面已经给出了完整代码。下面我们再对代码做一些简要介绍。首先看看读取文件生成DataFrame。由于本实验中使用的数据为结构化数据,因此可以使用spark读取源文件生成DataFrame以方便进行后续分析实现。本部分代码组织在analyst.py中,读取us-counties.txt生成DataFrame的代码如下: from pyspark import SparkConf,SparkContext from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql import SparkSession from datetime import datetime import pyspark.sql.functions as func def toDate(inputStr):     newStr = ""     if len(inputStr) == 8:         s1 = inputStr[0:4]         s2 = inputStr[5:6]         s3 = inputStr[7]         newStr = s1+"-"+"0"+s2+"-"+"0"+s3     else:         s1 = inputStr[0:4]         s2 = inputStr[5:6]         s3 = inputStr[7:]         newStr = s1+"-"+"0"+s2+"-"+s3     date = datetime.strptime(newStr, "%Y-%m-%d")     return date #主程序: spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),                     StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),] schema = StructType(fields) rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt") rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4]))) shemaUsInfo = spark.createDataFrame(rdd1,schema) shemaUsInfo.createOrReplaceTempView("usInfo")

5. 进行数据分析

本实验主要统计以下8个指标,分别是:

1) 统计美国截止每日的累计确诊人数和累计死亡人数。做法是以date作为分组字段,对cases和deaths字段进行汇总统计。

2) 统计美国每日的新增确诊人数和新增死亡人数。因为新增数=今日数-昨日数,所以考虑使用自连接,连接条件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases计算该日新增。

3) 统计截止5.19日,美国各州的累计确诊人数和死亡人数。首先筛选出5.19日的数据,然后以state作为分组字段,对cases和deaths字段进行汇总统计。

4) 统计截止5.19日,美国确诊人数最多的十个州。对3)的结果DataFrame注册临时表,然后按确诊人数降序排列,并取前10个州。

5) 统计截止5.19日,美国死亡人数最多的十个州。对3)的结果DataFrame注册临时表,然后按死亡人数降序排列,并取前10个州。

6) 统计截止5.19日,美国确诊人数最少的十个州。对3)的结果DataFrame注册临时表,然后按确诊人数升序排列,并取前10个州。

7) 统计截止5.19日,美国死亡人数最少的十个州。对3)的结果DataFrame注册临时表,然后按死亡人数升序排列,并取前10个州

8) 统计截止5.19日,全美和各州的病死率。病死率 = 死亡数/确诊数,对3)的结果DataFrame注册临时表,然后按公式计算。

在计算以上几个指标过程中,根据实现的简易程度,既采用了DataFrame自带的操作函数,又采用了spark sql进行操作。

6. 结果文件

上述Spark计算结果保存.json文件,方便后续可视化处理。由于使用Python读取HDFS文件系统不太方便,故将HDFS上结果文件转储到本地文件系统中,使用以下命令:

 

对于result2等结果文件,使用相同命令,只需要改一下路径即可。

7. 数据可视化

选择使用python第三方库pyecharts作为可视化工具。

在使用前,需要安装pyecharts,安装代码如下:

pip install pyecharts

具体可视化实现代码组织与showdata.py文件中。具体代码如下:

from pyecharts import options as opts from pyecharts.charts import Bar from pyecharts.charts import Line from pyecharts.components import Table from pyecharts.charts import WordCloud from pyecharts.charts import Pie from pyecharts.charts import Funnel from pyecharts.charts import Scatter from pyecharts.charts import PictorialBar from pyecharts.options import ComponentTitleOpts from pyecharts.globals import SymbolType import json #1.画出每日的累计确诊病例数和死亡数——>双柱状图 def drawChart_1(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     date = []     cases = []     deaths = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             date.append(str(js['date']))             cases.append(int(js['cases']))             deaths.append(int(js['deaths']))     d = (     Bar()     .add_xaxis(date)     .add_yaxis("累计确诊人数", cases, stack="stack1")     .add_yaxis("累计死亡人数", deaths, stack="stack1")     .set_series_opts(label_opts=opts.LabelOpts(is_show=False))     .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))     .render("/home/hadoop/result/result1/result1.html")     ) #2.画出每日的新增确诊病例数和死亡数——>折线图 def drawChart_2(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     date = []     cases = []     deaths = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             date.append(str(js['date']))             cases.append(int(js['caseIncrease']))             deaths.append(int(js['deathIncrease']))     (     Line(init_opts=opts.InitOpts(width="1600px", height="800px"))     .add_xaxis(xaxis_data=date)     .add_yaxis(         series_name="新增确诊",         y_axis=cases,         markpoint_opts=opts.MarkPointOpts(             data=[                 opts.MarkPointItem(type_="max", name="最大值")             ]         ),         markline_opts=opts.MarkLineOpts(             data=[opts.MarkLineItem(type_="average", name="平均值")]         ),     )     .set_global_opts(         title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),         tooltip_opts=opts.TooltipOpts(trigger="axis"),         toolbox_opts=opts.ToolboxOpts(is_show=True),         xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),     )     .render("/home/hadoop/result/result2/result1.html")     )     (     Line(init_opts=opts.InitOpts(width="1600px", height="800px"))     .add_xaxis(xaxis_data=date)     .add_yaxis(         series_name="新增死亡",         y_axis=deaths,         markpoint_opts=opts.MarkPointOpts(             data=[opts.MarkPointItem(type_="max", name="最大值")]         ),         markline_opts=opts.MarkLineOpts(             data=[                 opts.MarkLineItem(type_="average", name="平均值"),                 opts.MarkLineItem(symbol="none", x="90%", y="max"),                 opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),             ]         ),     )     .set_global_opts(         title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),         tooltip_opts=opts.TooltipOpts(trigger="axis"),         toolbox_opts=opts.ToolboxOpts(is_show=True),         xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),     )     .render("/home/hadoop/result/result2/result2.html")     ) #3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格 def drawChart_3(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     allState = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             row = []             row.append(str(js['state']))             row.append(int(js['totalCases']))             row.append(int(js['totalDeaths']))             row.append(float(js['deathRate']))             allState.append(row)     table = Table()     headers = ["State name", "Total cases", "Total deaths", "Death rate"]     rows = allState     table.add(headers, rows)     table.set_global_opts(         title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")     )     table.render("/home/hadoop/result/result3/result1.html") #4.画出美国确诊最多的10个州——>词云图 def drawChart_4(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     data = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             row=(str(js['state']),int(js['totalCases']))             data.append(row)     c = (     WordCloud()     .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)     .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))     .render("/home/hadoop/result/result4/result1.html")     ) #5.画出美国死亡最多的10个州——>象柱状图 def drawChart_5(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     state = []     totalDeath = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             state.insert(0,str(js['state']))             totalDeath.insert(0,int(js['totalDeaths']))     c = (     PictorialBar()     .add_xaxis(state)     .add_yaxis(         "",         totalDeath,         label_opts=opts.LabelOpts(is_show=False),         symbol_size=18,         symbol_repeat="fixed",         symbol_offset=[0, 0],         is_symbol_clip=True,         symbol=SymbolType.ROUND_RECT,     )     .reversal_axis()     .set_global_opts(         title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),         xaxis_opts=opts.AxisOpts(is_show=False),         yaxis_opts=opts.AxisOpts(             axistick_opts=opts.AxisTickOpts(is_show=False),             axisline_opts=opts.AxisLineOpts(                 linestyle_opts=opts.LineStyleOpts(opacity=0)             ),         ),     )     .render("/home/hadoop/result/result5/result1.html")     ) #6.找出美国确诊最少的10个州——>词云图 def drawChart_6(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     data = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             row=(str(js['state']),int(js['totalCases']))             data.append(row)     c = (     WordCloud()     .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)     .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州"))     .render("/home/hadoop/result/result6/result1.html")     ) #7.找出美国死亡最少的10个州——>漏斗图 def drawChart_7(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     data = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             data.insert(0,[str(js['state']),int(js['totalDeaths'])])     c = (     Funnel()     .add(         "State",         data,         sort_="ascending",         label_opts=opts.LabelOpts(position="inside"),     )     .set_global_opts(title_opts=opts.TitleOpts(title=""))     .render("/home/hadoop/result/result7/result1.html")     ) #8.美国的病死率--->饼状图 def drawChart_8(index):     root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"     values = []     with open(root, 'r') as f:         while True:             line = f.readline()             if not line:                            # 到 EOF,返回空字符串,则终止循环                 break             js = json.loads(line)             if str(js['state'])=="USA":                 values.append(["Death(%)",round(float(js['deathRate'])*100,2)])   values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])     c = (     Pie()     .add("", values)     .set_colors(["blcak","orange"])     .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))     .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))     .render("/home/hadoop/result/result8/result1.html")     ) #可视化主程序: index = 1 while index双柱状图

美国每日的新增确诊病例数——>折线图

美国每日的新增死亡病例数——>折线图

截止5.19,美国各州累计确诊、死亡人数和病死率—>表格

 

截止5.19,美国累计确诊人数前10的州—>词云图

 

截止5.19,美国累计死亡人数前10的州—>象柱状图

 

截止5.19,美国累计确诊人数最少的10个州—>词云图

 

截止5.19,美国累计死亡人数最少的10个州—>漏斗图

 

         截止5.19,美国的病死率—>饼状图

六、经验总结

这次的实验可以说是千辛万苦,原以为虚拟机的Ubuntu环境是完美的,结果发现从学校拿过来的镜像是有问题的,无奈之下只能去网上下载一个新的镜像,直接一夜回到解放前。光是搭建环境就耗费了我一天的时间,让我很明显的发现之前学到的知识已经逐渐生疏,原本一小时就能搭建完成的环境硬是被我耗成了一天。其实这次实验并不困难,就是我们平时做的数据可视化分析,但就是换了一下环境就让我乱了阵脚这是这次实验的失败之处。这次实验让我深深的感受到了自己的专业技能不够熟练,日后会继续打磨自己,让自己的技术更上一层楼。

 

参考文献

林子雨.2020年美国新冠肺炎疫情数据分析_厦大数据库实验室博客

(xmu.edu.cn)

林子雨.Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/

Ubuntu18.04(16.04)_厦大数据库实验室博客 (xmu.edu.cn)

林子雨.Spark安装和编程实践(Spark2.4.0)_厦大数据库实验室博客

(xmu.edu.cn)

林子雨.使用Jupyter Notebook调试PySpark程序_厦大数据库实验室博客

(xmu.edu.cn)

 

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3