Pyspark:读取本地文件和HDFS文件

您所在的位置:网站首页 jupyter上传本地文件 Pyspark:读取本地文件和HDFS文件

Pyspark:读取本地文件和HDFS文件

2023-09-19 02:51| 来源: 网络整理| 查看: 265

1.读取本地文件

首先需要在目录“/usr/local/spark/mycode/wordcount”下,建好一个word.txt:

hadoop@rachel-virtual-machine:/usr/local/spark$ ./bin/pyspark ./bin/pyspark: 行 45: python: 未找到命令 Python 3.6.8 (default, Jan 14 2019, 11:02:34) [GCC 8.0.1 20180414 (experimental) [trunk revision 259383]] on linux Type "help", "copyright", "credits" or "license" for more information. 2019-08-28 16:29:50 WARN Utils:66 - Your hostname, rachel-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33) 2019-08-28 16:29:50 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address 2019-08-28 16:30:02 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3 /_/ Using Python version 3.6.8 (default, Jan 14 2019 11:02:34) SparkSession available as 'spark'. >>> textFile = sc.textFile('file:///usr/local/spark/mycode/wordcount/word.txt')

sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。这两个textFile不是一个东西,不要混淆。

实际上,前面的变量textFile,你完全可以换个变量名称,比如, line=sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。这里使用相同名称,就是有意强调二者的区别。

注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。

下面我们执行一条“行动”类型的语句,就可以看到结果:

>>> textFile.first() 'Type "help", "copyright", "credits" or "license" for more information.'

first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。你可以从这些结果信息中,找到word.txt文件中的第一行的内容。

正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,pyspark也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:

>>> textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")

上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,pyspark根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作first(),如下:

如何把textFile变量中的内容再次写回到另外一个目录wordback中:

>>> textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt") >>> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

上面的saveAsTextFile()括号里面的参数是保存文件的路径,不是文件名。saveAsTextFile()是一个“行动”(Action)类型的操作,所以,马上会执行真正的计算过程,从word.txt中加载数据到变量textFile中,然后,又把textFile中的数据写回到本地文件目录“/usr/local/spark/mycode/wordcount/writeback/”下面。

 

2.读取HDFS文件

为了能够读取HDFS中的文件,请首先启动Hadoop中的HDFS组件。(由于用不到MapReduce组件,所以,不需要启动MapReduce或者YARN)

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./sbin/start-dfs.sh Starting namenodes on [localhost] hadoop@localhost's password: localhost: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hadoop-namenode-rachel-virtual-machine.out hadoop@localhost's password: localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hadoop-datanode-rachel-virtual-machine.out Starting secondary namenodes [0.0.0.0] [email protected]'s password: 0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-hadoop-secondarynamenode-rachel-virtual-machine.out hadoop@rachel-virtual-machine:/usr/local/hadoop$ hadoop@rachel-virtual-machine:/usr/local/hadoop$ jps 3450 Jps 1915 SparkSubmit 2475 DataNode 2731 SecondaryNameNode 2318 NameNode

下面我们使用命令查看一下HDFS文件系统中的目录和文件:

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -ls . ls: `.': No such file or directory

这是因为在HDFS文件系统中,还没有为当前Linux登录用户创建目录,使用下面命令创建:

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -mkdir -p /user/hadoop

HDFS文件系统为Linux登录用户开辟的默认目录是“/user/用户名”(注意:是user,不是usr),因为本机是用户名hadoop登录Linux系统,所以,上面创建了“/user/hadoop”目录,再次强调,这个目录是在HDFS文件系统中,不在本地文件系统中。

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -ls . hadoop@rachel-virtual-machine:/usr/local/hadoop$

上面命令中,最后一个点号“.”,表示要查看Linux当前登录用户hadoop在HDFS文件系统中与hadoop对应的目录下的文件,也就是查看HDFS文件系统中“/user/hadoop/”目录下的文件,所以,下面两条命令是等价的:

./bin/hdfs dfs -ls . ./bin/hdfs dfs -ls /user/hadoop

如果要查看HDFS文件系统根目录下的内容,需要使用下面命令:

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -ls / Found 1 items drwxr-xr-x - hadoop supergroup 0 2019-08-28 16:57 /user

下面,我们把本地文件系统中的“/usr/local/spark/mycode/wordcount/word.txt”上传到分布式文件系统HDFS中(放到hadoop用户目录下):

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt . hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -ls . Found 1 items -rw-r--r-- 1 hadoop supergroup 625 2019-08-28 17:03 word.txt

可以看到,确实多了一个word.txt文件,我们使用cat命令查看一个HDFS中的word.txt文件的内容,命令如下:

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -cat ./word.txt Type "help", "copyright", "credits" or "license" for more information. 2019-08-28 15:27:12 WARN Utils:66 - Your hostname, rachel-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33) 2019-08-28 15:27:12 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address 2019-08-28 15:27:23 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to

上面命令执行后,就会看到HDFS中word.txt的内容了。

回到pyspark窗口,编写语句从HDFS中加载word.txt文件:

>>> textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") >>> textFile.first() 'Type "help", "copyright", "credits" or "license" for more information.'

执行上面语句后,就可以看到HDFS文件系统中(不是本地文件系统)的word.txt的第一行内容了。

需要注意的是,sc.textFile(“hdfs://localhost:9000/user/hadoop/word.txt”)中,“hdfs://localhost:9000/”是前面介绍Hadoop安装内容时确定下来的端口地址9000。实际上,也可以省略不写,如下三条语句都是等价的:

>>> textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") >>> textFile = sc.textFile("/user/hadoop/word.txt") >>> textFile = sc.textFile("word.txt")

下面,我们再把textFile的内容写回到HDFS文件系统中(写到hadoop用户目录下):

>>> textFile.saveAsTextFile("writeback")

执行上面命令后,文本内容会被写入到HDFS文件系统的“/user/hadoop/writeback”目录下,我们可以切换到Linux Shell命令提示符窗口查看一下:

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -ls . Found 2 items -rw-r--r-- 1 hadoop supergroup 625 2019-08-28 17:03 word.txt drwxr-xr-x - hadoop supergroup 0 2019-08-28 17:08 writeback

下面我们查看该目录下有什么文件:

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -ls ./writeback Found 3 items -rw-r--r-- 1 hadoop supergroup 0 2019-08-28 17:08 writeback/_SUCCESS -rw-r--r-- 1 hadoop supergroup 334 2019-08-28 17:08 writeback/part-00000 -rw-r--r-- 1 hadoop supergroup 291 2019-08-28 17:08 writeback/part-00001

执行结果中如上。我们使用下面命令输出part-00000文件的内容(注意:part-00000里面有五个零):

hadoop@rachel-virtual-machine:/usr/local/hadoop$ ./bin/hdfs dfs -cat ./writeback/part-00000 Type "help", "copyright", "credits" or "license" for more information. 2019-08-28 15:27:12 WARN Utils:66 - Your hostname, rachel-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33) 2019-08-28 15:27:12 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3