Pyspark

您所在的位置:网站首页 dysmantle追捕清道夫任务 Pyspark

Pyspark

2023-05-18 01:31| 来源: 网络整理| 查看: 265

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。 今天继续和大家分享一下Pyspark_SQL6 #博学谷IT学习技术支持

文章目录 Pyspark前言一、新零售综合小案例1.数据转换和清洗2.数据统计需求 二、Spark SQL的运行机制总结

前言

今天继续分享Pyspark_SQL6。

一、新零售综合小案例 1.数据转换和清洗

第一步:过滤无效数据,转换时间格式

from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as F if __name__ == '__main__': print("spark xls clear") spark = SparkSession.builder.appName("spark xls clear").master("local[*]") \ .config('spark.sql.shuffle.partitions', 4) \ .getOrCreate() df = spark.read.format("csv") \ .option("header", True).option("inferSchema", True).option("sep", ",") \ .load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/E_Commerce_Data.csv") # 1 - 将客户ID中不为0的数据保留, 为0的数据过滤掉 # 2 - 将商品描述不为空的数据保留, 为空的数据过滤掉 df = df.where("CustomerID != 0").where("Description != ''") print(df.count()) # 3- 将日期格式转换为 yyyy-MM-dd HH:mm 原格式: 12/1/2010 8:26 转换为:2010-12-01 08:26 df = df.withColumn("InvoiceDate", F.from_unixtime(F.unix_timestamp("InvoiceDate", "M/d/yyyy H:mm"), "yyyy-MM-dd HH:mm")) df.show() df.write.mode("overwrite").format("csv").option("header", True).option("sep", "|").option("encoding", 'GBK') \ .save("hdfs://node1:8020/xls/output") spark.stop()

清洗后的数据为:

2.数据统计需求

需求一: 统计各个国家有多少的客户量 需求二: 统计销量最高的10个国家 需求三: 各个国家的总销售额分布情况 需求四: 销售最高的10个商品 需求五: 商品描述的热门关键词TOP300 需求六: 统计退货订单数最多的10个国家 需求七: 商品的平均单价与销售的关系 需求八: 月销售额随时间的变化的趋势 需求九: 日销售随时间的变化趋势 需求十: 各个国家的购买订单量和退货订单量关系

from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as F def method1(): # 需求一: 统计各个国家有多少的客户量 df.select("CustomerID", "Country").groupBy("Country").agg( F.countDistinct("CustomerID").alias("c_cnt") ).orderBy("c_cnt", ascending=False).show() def method2(): # 需求二: 统计销量最高的10个国家 df.select("Country", "Quantity").groupBy("Country").agg( F.sum("Quantity").alias("q_sum") ).orderBy("q_sum", ascending=False).limit(10).show() def method3(): # 需求三: 各个国家的总销售额分布情况 df.select("Country", "Quantity", "UnitPrice").withColumn("totalPrice", F.col("Quantity") * F.col("UnitPrice")) \ .groupBy("Country").agg( F.round(F.sum("totalPrice"), 2).alias("t_sum") ).orderBy("t_sum", ascending=False).show() def method4(): # 需求四: 销售最高的10个商品 df.where("InvoiceNo not like 'C%'").select("StockCode", "Quantity").groupBy("StockCode").agg( F.sum("Quantity").alias("s_q_sum") ).orderBy("s_q_sum", ascending=False).limit(10).show() def method5(): # 需求五: 商品描述的热门关键词TOP300 df.select(F.explode(F.split("Description", " ")).alias("words")).groupBy("words").agg( F.count("words").alias("w_cnt") ).orderBy("w_cnt", ascending=False).limit(300).show() def method6(): # 需求六: 统计退货订单数最多的10个国家 df.where("InvoiceNo like 'C%'").select("Country").groupBy("Country").agg( F.count("Country").alias("c_cnt") ).orderBy("c_cnt", ascending=False).limit(10).show() def method7(): # 需求七: 商品的平均单价与销售的关系 df.where("InvoiceNo not like 'C%'").select("StockCode", "Quantity", "UnitPrice").groupBy("StockCode").agg( F.round(F.avg("UnitPrice"), 2).alias("avg_price"), F.sum("Quantity").alias("q_sum") ).orderBy("avg_price", "q_sum", ascending=False).show() def method8(): # 需求八: 月销售额随时间的变化的趋势 df.where("InvoiceNo not like 'C%'").groupby(F.substring('InvoiceDate', 1, 7).alias('month')).agg( F.round(F.sum(F.col('Quantity') * F.col('UnitPrice')), 2).alias('total_price') ).orderBy('month').show() def method9(): # 需求九: 日销售随时间的变化趋势 df.where("InvoiceNo not like 'C%'").select("InvoiceDate", "Quantity", "UnitPrice") \ .withColumn("InvoiceDate", F.substring("InvoiceDate", 9, 2).alias("day")).groupBy("InvoiceDate").agg( F.round(F.sum(F.col("Quantity") * F.col("UnitPrice"))).alias("day_total_price") ).orderBy("InvoiceDate").show() def method10(): # 需求十: 各个国家的购买订单量和退货订单量关系 df.select("Country", "InvoiceNo").groupBy("Country").agg( F.countDistinct('InvoiceNo').alias('o_cnt'), F.countDistinct(F.expr("if(InvoiceNo like 'C%',InvoiceNo,NULL)")).alias('t_cnt') ).show() if __name__ == '__main__': print("spark xls analysis") spark = SparkSession.builder.appName("spark xls analysis").master("local[*]") \ .config('spark.sql.shuffle.partitions', 4) \ .getOrCreate() df = spark.read.format("csv") \ .option("header", True).option("inferSchema", True).option("sep", "|").option("encoding", "GBK") \ .load("hdfs://node1:8020/xls/output") df.printSchema() df.show() method1() method2() method3() method4() method5() method6() method7() method8() method9() method10() spark.stop() 二、Spark SQL的运行机制

回顾: Job的调度流程 1- Driver程序首先会先创建SparkContext对象, 同时在其底层会创建DAGScheduler 和 TaskScheduler

2- 当Spark程序遇到一个action算子后, 就会触发一个Job任务, 首先通过DAGScheduler形成DAG执行流程图, 划分Stage, 并且确定每个stage中需要运行多少个Task线程, 将每个阶段的Task线程放置到对应的TaskSet的集合中, 最后将各个阶段的TaskSet提交到TaskScheduler

3- TaskScheduler接收到各个阶段的TaskSet后, 然后依次的进行调度执行, 确定每个Task线程需要运行到那个executor(尽量保证负载均衡)

4- 后续Driver负责整个任务进行监控管理即可…

​ Spark SQL底层依然是运行的Spark RDD的程序, 所以说Spark RDD程序运行流程, 在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程

​ 所以: Spark SQL的运行机制, 其实就是在描述如何将Spark SQL翻译为RDD程序

1- sparkSQL底层解析是有RBO 和 CBO优化完成的 2- RBO是基于规则优化, 对于SQL或DSL的语句通过执行引擎得到未执行逻辑计划, 在根据元数据得到逻辑计划, 之后加入列值裁剪或谓词下推等优化手段形成优化的逻辑计划 3- CBO是基于优化的逻辑计划得到多个物理执行计划, 根据代价函数选择出最优的物理执行计划 4- 通过codegenaration代码生成器完成RDD的代码构建 5- 底层依赖于DAGScheduler 和TaskScheduler 完成任务计算执行

所以官网建议比起Spark RDD ,更推荐Spark SQL。原因在于底层优化的更好。

总结

今天主要和大家分享了一个小练习,和Spark SQL的运行机制。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3