pyspark 常用操作,以及 python shell 和 pyspark shell 语法对比

您所在的位置:网站首页 pyspark语法PDF pyspark 常用操作,以及 python shell 和 pyspark shell 语法对比

pyspark 常用操作,以及 python shell 和 pyspark shell 语法对比

2024-07-12 14:29| 来源: 网络整理| 查看: 265

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

更多实例演示:https://github.com/datadevsh/pyspark-api

1.python环境

包括 jupyter 、Python shell 、pycharm 。

1.1.获取sc对象 from pyspark import SparkConf, SparkContext val conf = new SparkConf().setMaster("local[3]").setAppName("als").set("spark.executor.memory","10g") val sc = SparkContext.getOrCreate(conf) sc.setLogLevel("ERROR") # .set('spark.driver.host','txy').set('spark.local.ip','txy') # val spark = SparkSession.builder.getOrCreate() 1.2.读取文件 lines = sc.textFile("D:/ML/python-design/ml-10M100K/ratings.dat") 1.3. 切割字符串 parts = lines.map(lambda row: row.split("::")) 1.4.创建dataframe / 切割数据集

前提:

from pyspark.sql import Row ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]))) (training, test) = ratingsRDD.randomSplit([0.8, 0.2]) 1.5. 模型对象初始化和训练 from pyspark.ml.recommendation import ALS model = ALS.train(training, rank=50, iterations=10, lambda_=0.01) 1.6.读取文件夹目录下所有文件 import os flag = True if os.path.isdir(inputFile): # 如果路径是文件夹 files = os.listdir(inputFile) # 得到文件夹下的所有文件名称 for file in files: # 遍历文件夹 if (flag): lines = sc.textFile(inputFile + file) flag = False else: lines = sc.textFile(inputFile + file).union(lines) # 把所有文件做并集 else: lines = sc.textFile(inputFile) # 如果路径是一个文件 1.7.pyspark DataFrame遍历 1.7.1.遍历 for i in arr: print(i) 1.7.2.遍历ResultIterable 1.7.2.1.父集合是list类型 user_item_hist # list类型 [('uhf34sdcfe3', ), ('dsfcds2332f', )]

遍历ResultIterable 里面的值

for x in user_item_hist: print(x[0],list(x[1])) 1.7.2.2.父集合是PythonRDD类型 1. 写一个文件 user.py class User: def __init__(self, line): self.user_id = line[0] self.location = line[1] 2.引入 from user import User def create_user(line): user = User(line) return user 3.转换、遍历 for user in user_item_pairs.map(lambda entry: create_user(entry)).collect(): print(user.user_id,list(user.location)) 2.pyspark shell环境

可见,这里的spark对象来自SparkSession,所以和来自SparkContext的sc用法不太一样。

1.1. 获取sc对象

可以直接使用“spark”来操作

1.2.读取文件 lines = spark.read.text("ratings.dat").rdd 1.3. 切割字符串 parts = lines.map(lambda row: row.value.split("\001")) 1.4.创建dataframe / 切割数据集

前提:

from pyspark.sql import Row ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]))) ratings = spark.createDataFrame(ratingsRDD) (training, test) = ratings.randomSplit([0.8, 0.2]) 1.5. 模型对象初始化和训练 from pyspark.ml.recommendation import ALS als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop") model = als.fit(training)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3