Spark重温笔记(一):一分钟部署PySpark环境,轻松上手Spark配置

您所在的位置:网站首页 spark重启 Spark重温笔记(一):一分钟部署PySpark环境,轻松上手Spark配置

Spark重温笔记(一):一分钟部署PySpark环境,轻松上手Spark配置

2024-07-12 10:01| 来源: 网络整理| 查看: 265

Spark学习笔记

前言:今天是温习 Spark 的第 1 天啦!主要梳理了Spark环境搭建,3种运行模式,以及spark入门知识点,任务提交方式,参数配置细节,以及启动和端口号等介绍,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"

文章目录 Spark学习笔记一、基础环境搭建1. pyspark+环境2. 三种运行方式(1) local模式(2) standlone模式——远程连接(3)HA模式——远程连接 3. Spark简介(1) 介绍spark(2) spark的yarn与deploy-mode(3) spark两种提交方式(4) Driver Program 和 Executor 参数配置(5) Spark服务启动(6) spark的重要端口号

一、基础环境搭建 1. pyspark+环境

创建一个conda的环境:用于pyspark本地运行

conda create --name Pyspark_Practice python=3.8.8 conda activate Pyspark_Practice 记得配置环境变量 PYSPARK_DRIVER_PYTHON=D:\anaconda\envs\Pyspark_Practice\python.exe PYSPARK_PYTHON=D:\anaconda\envs\Pyspark_Practice\python.exe jupyter notebook --ip 192.168.52.3 --port 8888 --allow-root

这会自动安装与你 python版本对应的最新的tensorflow

pip install -i https://mirrors.aliyun.com/pypi/simple/ --upgrade tensorflow

会安装与你环境python版本对应的 pytorch

conda install pytorch torchvision torchaudio cpuonly -c https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/win-64/ source activate 可以进入conda环境 source deactivate 可以退出conda环境

更换下载源

1、conda: 打开Terminal,输入conda config --show channels 这里我们使用清华大学的镜像源。在命令行中复制以下命令,并按回车: conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/ 完成后,再输入conda config --show channels,可以发现,镜像源已经被改为清华源了。 2、pip pip是一种通用的Python包管理工具,我们在conda环境下也可以使用pip来进行Python包的管理。 在命令行中运行: pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple 会生成一个pip.ini的文件,这就是pip镜像源的配置文件。 我们可以使用pip config list命令,查看pip镜像源地址。 2. 三种运行方式 (1) local模式

数据源 word.txt:

hello you Spark Flink hello me hello she Spark _01_FirstSparkPro.py # -*- coding: utf-8 -*- # Program function: 本地测试spark简单wordcount程序 from pyspark import SparkContext,SparkConf if __name__ == "__main__": # 1. 首先创建SparkContext的上下文环境 conf = SparkConf().setAppName("FirstSpark").setMaster("local[*]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 2.从外部数据源读取数据 fileRDD = sc.textFile("D:\PythonProject\Bigdata_Pyspark3.1.2\PySpark-SparkBase_3.1.2\data\input\word.txt") print(type(fileRDD)) print(fileRDD.collect()) # 3.执行flatmap进行数据扁平化 flat_mapRDD = fileRDD.flatMap(lambda words:words.split(" ")) print(type(flat_mapRDD)) print(flat_mapRDD.collect()) # 4.执行map转化操作,得到(word,1) rdd_mapRDD = flat_mapRDD.map(lambda word:(word,1)) print(type(rdd_mapRDD)) print(rdd_mapRDD.collect()) # 5.reduceByKey将相同的key的value数据累加操作 resultRDD = rdd_mapRDD.reduceByKey(lambda x,y:x+y) print(type(resultRDD)) # 6.输出数据 res_rdd_col2 = resultRDD.collect() # 7.输出到控制台 for line in res_rdd_col2: print(line) # 8.输出到文件:不用新建文件夹,输出结果是part文件和success文件 #resultRDD.saveAsTextFile("D:\PythonProject\Bigdata_Pyspark3.1.2\PySpark-SparkBase_3.1.2\data\output") # 9.针对values单词进行词频统计 print("=========================sort===============================") #print(resultRDD.sortBy(lambda x:x[1], ascending=False).take(3))——获取前3位 #print(resultRDD.sortBy(lambda x: x[1], ascending=False).top(3))——获取后三位 print('停止 PySpark SparkSession 对象') sc.stop() (2) standlone模式——远程连接 # 配置上下文环境时 conf = SparkConf().setAppName("StandloneSpark").setMaster("spark://node1:7077") # 读取文件时: # 路径不指定协议 默认使用hdfs # 读取hdfs文件: fileRDD = sc.textFile("hdfs://node1:9820/word.txt") # 读取linux本地文件:需要三个节点备份: fileRDD = sc.textFile("/export/data/pyspark_workplace/PySpark-SparkBase_3.1.2/data/input/word.txt") (3)HA模式——远程连接 # 1、首先创建SparkContext上下文环境 conf = SparkConf().setAppName("StandloneSpark").setMaster("spark://node1:7077,node2:7077") 3. Spark简介 (1) 介绍spark

Spark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校。

具有以下五大特点:

1-速度快: 其一、Spark处理数据时,可以将中间处理结果数据存储到内存中其二、spark job调度以DAG方式,每个任务Task以线程Thread方式,而不是mapreduce以进程process方式 2-易于使用: Spark 的版本已经更新到 Spark 3.1.2(截止日期2021.06.01),支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。为了兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。 3-通用性强: SparkSQL:提供结构化的数据处理方式SparkStreaming:处理流式处理任务MLlib:机器学习算法库GraphX:提供图形和图形并行化计算 4-运行方式: 对于数据源而言,Spark 支持从HDFS、HBase、 Kafka 等多种途径获取数据。 (2) spark的yarn与deploy-mode 1-提交yarn,不走deploy-mode SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 2-client模式:学习测试,driver运行在client的sparksubmit进程中,结果在客户端显示 SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --deploy-mode client \ --driver-memory 512m \ --executor-memory 512m \ --num-executors 1 \ --total-executor-cores 2 \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 3-cluster模式:生产环境使用,driver程序在yarn集群,运行结果客户端没有显示 SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --num-executors 1 \ --total-executor-cores 2 \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 (3) spark两种提交方式

spark-shell:交互式scale命令行,bin/spark-shell --master spark://node1:7077

spark-submit:

–master local[2]

${SPARK_HOME}/bin/spark-submit \ --master local[2] \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10

–master spark://node1.itcast.cn:7077 \

${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077 \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10

–master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \

${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 (4) Driver Program 和 Executor 参数配置 SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --deploy-mode cluster \ --driver-memory 512m \ # 指定driver program JVM进程内存大小,默认为1G --executor-memory 512m \ # 指定Executor 运行所需内存大小 --num-executors 1 \ # 表示在yarn集群下,Executor的个数,默认值为2 --total-executor-cores 2 \ # 所有任务的总 CPU cores --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 (5) Spark服务启动 集群启动 /export/server/spark/sbin/start-all.sh 本地启动:python交互式 /export/servers/spark/bin/pyspark --master local[4] 本地启动:scale交互式 /epxort/servers/spark/bin/spark-shell --master spark://node1:7077 /epxort/servers/spark/bin/spark-shell --master spark://node1:7077,node2:7077 启动历史日志服务器 sbin/mr-jobhistory-daemon.sh start historyserver 端口:18080 (6) spark的重要端口号 1-Spark Master Web UI:4040 这个端口用于显示 Spark Master 的 Web 用户界面通过这个界面查看和管理 Spark 集群的状态。 Spark Worker 和 Spark Driver 与 Master 通信端口:7077 这个端口用于 Spark Worker 和 Spark Driver 向 Spark Master 发送心跳和任务状态信息。 Spark EventLog 事件日志端口:4041 这个端口用于 Spark 事件日志的记录和查看,包括任务的创建、提交、完成等事件。 Spark History Server Web UI端口:18080 这个端口用于显示 Spark History Server 的 Web 用户界面,你可以通过这个界面查看已完成的任务的历史信息。


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3