基于地震数据的Spark数据处理与分析 |
您所在的位置:网站首页 › 准确预测地震案例分析 › 基于地震数据的Spark数据处理与分析 |
![]() 【版权声明】版权所有,严禁转载,严禁用于商业用途,侵权必究。 作者:厦门大学信息学院计算机科学系2019级研究生 周伟敬 指导老师:厦门大学数据库实验室 林子雨 博士/副教授 相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(访问教材官网) 相关案例:基于Python语言的Spark数据处理分析案例集锦(PySpark) 本案例针对全球重大地震数据进行分析,采用Python为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并对结果进行数据可视化。 一、 实验环境搭建(1)Linux:Ubuntu 16.04 (2)Hadoop:3.1.3(安装教程) (3)Spark:2.4.0(安装教程) (4)Python:Anaconda3(Anaconda及Jupyter Notebook安装教程) (5)运行环境:Jupyter Notebook 此外,还需要安装plotly用于绘制地图,安装wordcloud用于绘制词云图。 conda install plotly pip install -i https://pypi.tuna.tsinghua.edu.cn/simple wordcloud本博客提供了本实验的全部源代码,请到百度网盘下载,下载以后用Jupyter Notebook打开代码文件,调试运行。 下载百度网盘地址为:https://pan.baidu.com/s/1_Trxk1Dbs6R716fx2Vh3jA 提取码: 2hza 二、 数据预处理数据来自和鲸社区的1965-2016全球重大地震数据,文件名为earthquake.csv包括23412条地震数据,其中有很多属性大部分缺失且本次实验用不到,将其手动删除,只保留Date, Time, Latitude, Longitude, Type, Depth, Magnitude这七个属性。 数据集下载百度网盘地址为:https://pan.baidu.com/s/1_Trxk1Dbs6R716fx2Vh3jA 提取码: 2hza 本博客中的Python代码均运行在Jupyter Notebook上。 在用户目录下创建一个NoteBook文件夹,将earthquake.csv文件放入NoteBook中,接着打开Jupyter Notebook。 cd NoteBook jupyter notebook此时会自动打开浏览器并进入Jupyter Notebook的主页,点击右上角的new按钮创建一个新的Python3文件。
在代码框中输入代码,运行后可以直接显示结果。首先从文件中读取数据: import pandas as pd import numpy as np rawData = pd.read_csv('~/NoteBook/earthquake.csv') rawData.head(10)
Pandas提供了to_datetime()方法,可以将不同的日期格式转换为datetime 类型。设置errors='coerce'参数,遇到不能转换的格式时将其置为NaN。 rawData['Structed Date'] = pd.to_datetime(rawData['Date'], format = "%m/%d/%Y", errors='coerce').dt.date rawData['Structed Date'].loc[rawData['Structed Date'].isnull()]
接着按照同样地步骤对Time数据进行清洗。 # 对时间进行格式转换 rawData['Structed Time'] = pd.to_datetime(rawData['Time'], format = "%H:%M:%S", errors='coerce').dt.time rawData['Structed Time'].loc[rawData['Structed Time'].isnull()] rawData['Time'].loc[[3378,7512,20650]] rawData.loc[3378, 'Structed Time'] = '2:58:41' rawData.loc[7512, 'Structed Time'] = '2:53:41' rawData.loc[20650, 'Structed Time'] = '2:23:34' del rawData['Time']修改完成后,将列名重新改回Date和Time。 rawData.rename(columns={'Structed Date': 'Date', 'Structed Time': 'Time'}, inplace=True) 2. 根据经纬度获取地名高德地图的逆地理编码API服务可以根据经纬度查询出该位置的地名信息。首先需要在高德开放平台注册成为个人开发者;
经过测试,这个API对于中国境外的坐标不能准确地返回结果,因此只获取在中国境内的坐标信息。坐标在中国境外时,就返回一个空值。如果坐标在海上,就获取海域的名称(如南海);如果在陆地上,就获取省份的名称。 编写程序为每个坐标获取地名信息,并将其存到Area列中。这个过程大概需要花费10分钟。 import requests import json def getProvince(lon, lat): u1 = 'http://restapi.amap.com/v3/geocode/regeo?output=json&' key = '&key=**********' // 星号处填入刚才在高德开放平台获取到的Key location = 'location=' + str(lon) + ','+ str(lat) url = u1 + location + key res = requests.get(url) json_data = json.loads(res.text) regeoinfo = json_data['regeocode']['addressComponent'] if 'country' in regeoinfo and regeoinfo['country'] == '中国': if 'province' in regeoinfo and regeoinfo['province']: return regeoinfo['province'] elif 'seaArea' in regeoinfo and regeoinfo['seaArea']: return regeoinfo['seaArea'] return None for i in range(23412): lon = rawData.loc[i, 'Longitude'] lat = rawData.loc[i, 'Latitude'] rawData.loc[i, 'Area'] = getProvince(lon, lat)查看获取到的地名信息。 rawData["Area"].unique()查看Type数据中是否存在异常值。 rawData.Type.unique()
至此,数据清洗工作完成,将数据保存到earthquake_cleaned.csv文件中。使用utf-8编码防止Spark读取时出现中文乱码,设置index=False防止将第一列索引保存至文件中。 rawData.to_csv("earthquake_cleaned.csv", encoding='utf-8', index=False)上传到HDFS以便Spark进行数据分析。 # 首先启动hadoop cd /usr/local/hadoop ./sbin/start-dfs.sh # 将文件上传到HDFS ./bin/hdfs dfs –mkdir –p /user/[username] # username处填入当前Linux系统登录的用户名 ./bin/hdfs dfs –mkdir input ./bin/hdfs dfs -put ~/NoteBook/earthquake_cleaned.csv input 三、 Spark数据分析用Spark读取earthquake_cleaned.csv文件创建DataFrame。 from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() df = spark.read.csv("input/earthquake_cleaned.csv", header=True, inferSchema=True) df = df.repartition(1) df.show(10)
首先要对Date属性进行拆分,分别得到年月日的信息。 attrsName = ['Year', 'Month', 'Day'] for i in range(len(attrsName)): df = df.withColumn(attrsName[i], split(df['Date'], "-")[i]) df.printSchema()将Date属性拆分后发现年月日信息都是字符串类型,将其改为整型。 for x in attrsName: df = df.withColumn(x, df[x].cast('int'))接下来对数据分别按年月日计数并将结果保存到文件中,方便之后进行可视化。由于使用Spark自带的函数将DataFrame保存为csv文件时,文件会保存为文件夹,在本地读取时比较麻烦。因此使用toPandas()方法将Spark的DataFrame转换成pandas的DataFrame,再保存为csv文件,方便可视化时读取。 countByYear = df.groupBy("Year").count().orderBy("Year") countByYear.toPandas().to_csv("countByYear.csv", encoding='utf-8', index=False) countByMonth = df.groupBy("Month").count().orderBy("Month") countByMonth.toPandas().to_csv("countByMonth.csv", encoding='utf-8', index=False) countByDay = df.groupBy("Day").count().orderBy("Day") countByDay.toPandas().to_csv("countByDay.csv", encoding='utf-8', index=False) 2. 中国境内每个省份(海域)发生重大地震的次数首先筛选出中国境内发生的重大地震信息,保存到文件中。 earthquakeC = df.filter("Area is not null") earthquakeC.toPandas().to_csv("earthquakeC.csv", encoding='utf-8', index=False)按Area属性对数据进行分组计数,要注意只统计Area非空的数据。 countByArea = earthquakeC.groupBy("Area").count() countByArea.toPandas().to_csv("countByArea.csv", encoding='utf-8', index=False) 3. 不同类型地震的数量分别计算出中国境内和世界范围内的不同地震类型的数量。 countByTypeC = earthquakeC.groupBy("Type").count() countByTypeC.toPandas().to_csv("countByTypeC.csv", encoding='utf-8', index=False) 4. 震级前500的地震根据震级进行降序排序,个人比较关注离现在更近的地震,因此再根据年份降序排列。由于使用take(500)之后返回一个RDD,其中每个元素都是Row对象。此时根据这个RDD生成DataFrame,再将其保存为csv文件。 mostPow = df.sort(df["Magnitude"].desc(), df["Year"].desc()).take(500) mostPowDF = spark.createDataFrame(mostPow) mostPowDF.toPandas().to_csv("mostPow.csv", encoding='utf-8', index=False) 5. 震源深度前500的地震当震源深度相同时,将震级更高的地震排在前面。 mostDeep = df.sort(df["Depth"].desc(), df["Magnitude"].desc()).take(500) mostDeepDF = spark.createDataFrame(mostDeep) mostDeepDF.toPandas().to_csv("mostDeep.csv", encoding='utf-8', index=False) 6. 震级与震源深度的关系将数据中的Depth和Magnitude拿出来,保存到文件中。 df.select(df["Magnitude"], df["Depth"]).toPandas().to_csv("powDeep.csv", encoding='utf-8', index=False)至此数据分析基本完成,将df也保存到csv文件中,用于数据可视化。 df.toPandas().to_csv("earthquake1.csv", encoding='utf-8', index=False) 四、 数据可视化可视化工具使用plotly,可以绘制可交互的图表,支持保存为html格式的文件方便查看,并可以完美兼容Jupyter Notebook。另外,用WordCloud绘制词云。 1. 数据总览首先将所有数据绘制到地图上。根据我们的常识,地震震级越高,地震的破坏力呈几何级数上升。为了画出更直观的图,使用exp(Magnitude)/100作为每个坐标的大小。画出的图都保存为html格式,方便查看。 import plotly.express as px import plotly.io as pio data = pd.read_csv('earthquake1.csv') fig1 = px.scatter_geo(data, color = data.Magnitude, color_continuous_scale = px.colors.sequential.Inferno, lon = data.Longitude, lat = data.Latitude, hover_name = data.Type, hover_data = ["Longitude", "Latitude", "Date", "Time", "Magnitude", "Depth" ], size = np.exp(data.Magnitude)/100, projection = "equirectangular", title = '1965-2016年全球重大地震' ) fig1.show() pio.write_html(fig1, 'fig1.html')地图支持放大缩小,且鼠标指针停在某个点上时可以显示hover_data中设置的信息。可以清楚地看出重大地震主要分布在地震带上,但是同时在一张图上显示两万多个点导致这张图非常卡顿,浏览起来很不舒服。Plotly还支持根据某个数据为一帧生成动画,下面根据年份信息逐帧生成图片,不会卡顿且更加美观。
既可以根点击左下角的播放暂停按键也可以直接拖动下方的进度条控制进度。
根据数据分析结果画出柱状图。 cntByYear = pd.read_csv('countByYear.csv') fig3 = px.bar(cntByYear, x = "Year", y = "count", text = "count", title = '1965-2016年每年发生重大地震的次数' ) fig3.show() pio.write_html(fig3, 'fig3.html')前十多年的地震次数明显少于之后的,可能是早期地震监测技术比较落后的原因。 cntByMonth = pd.read_csv('countByMonth.csv') fig4 = px.bar(cntByMonth, x = "Month", y = "count", text = "count", title = '1965-2016年每月发生重大地震的次数' ) fig4.show() pio.write_html(fig4, 'fig4.html')
首先画出中国境内发生的重大地震。 dataC = pd.read_csv('earthquakeC.csv') fig6 = px.scatter_geo(dataC, color = dataC.Magnitude, color_continuous_scale = px.colors.sequential.Inferno, lon = dataC.Longitude, lat = dataC.Latitude, scope = 'asia', center = {'lon': 105.73, 'lat': 29.6}, hover_name = dataC.Type, hover_data = ["Longitude", "Latitude", "Date", "Time", "Magnitude", "Depth" ], size = np.exp(dataC.Magnitude)/100, title = '1965-2016年中国境内重大地震' ) fig6.show() pio.write_html(fig6, 'fig6.html')
根据数据画出饼状图。 fig10 = px.pie(cntByType, names = "Type", values = "count", title = '1965-2016年世界范围内不同类型的地震占比' ) fig10.show() pio.write_html(fig10, 'fig10.html')
根据Depth和Magnitude画出散点图。 powAndDep = pd.read_csv('powDeep.csv') fig12 = px.scatter(powAndDep, x = "Depth", y = "Magnitude", title = '震级与震源深度的关系' ) fig12.show() pio.write_html(fig12, 'fig12.html')
|
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |