Spark读取HDFS或者AFS等文件系统文件

您所在的位置:网站首页 spark读取hdfs文件 Spark读取HDFS或者AFS等文件系统文件

Spark读取HDFS或者AFS等文件系统文件

2023-03-28 03:53| 来源: 网络整理| 查看: 265

                        Spark读取HDFS或者AFS等文件系统文件

Spark读取文件有很多方法,我这里主要介绍一下读取非结构化的文件的两种方式,针对多文件读取,单文件读取也是一样的。

方案一:spark的textFile方法,也是最简单的方案,支持通配符,简单好用

String afsFilePath="afs://afs.yun.com/app/file/*/sss*"; // String afsFilePath="afs://afs.yun.com/app/file/text/text.txt"; // String afsFilePath="afs://afs.yun.com/app/file/text/*.log"; // 读取之后的是String类型的Dataframe对象,一般需要转换为RDD去处理更方便 // spark会自动去循环目录下的所有文件,可以通过通配符的方式读取你想要的文件 // 通配符的方式还有很多,我这里就不一一概述了 Dataset stringDataset = spark.read().textFile(afsFilePath); JavaRDD logLinesStr = stringDataset.javaRDD(); JavaRDD logLinesStr = spark.read().textFile(afsFilePath).javaRDD(); 复制代码

Hadoop支持的通配符与Unix bash相同

表1 通配符及其含义

通配符名称匹配* 星号 匹配0或多个任意字符? 问号匹配单一字符[ab]  字符类匹配{a,b}集合中的一个字符[^ab] 非字符类匹配非{a,b}集合中的一个字符[a-z]字符范围匹配一个在{a,z}范围内的字符(包含az),a在字典顺序上要小于z[^a-z] 非字符范围 匹配一个不在{a,b}范围内的字符(包含ab),a在字典顺序上要小于b{a,b} 或选择匹配包含a或b中的一个表达式

方案二:使用循环目录的方式去调用text方法,这种方法适用于需要读取的文件名比较复杂,或者没有规则,而且里面有很多不是你想要的那种文件啥的。

原理很简单,就是传递一个目录,然后去list目录下的文件,然后判断是否是文件下,是就递归,不是就获取文件绝对路径,因为text方法只支持绝对文件路径读取单个文件。

代码写的不是很好哈,有时间了在优化一下,主要就是看一下怎么用就好。不读取带@的,是因为我的系统里面带这个文件名的都是通知文件,不是我想要的。

// 这段代码主要是读取afs上的文件,hdfs的也一样 AFS_URL = "afs://xxx.xxx.xxx"; public Dataset readFilesByPath(String path) { FileSystem fileSystem = null; Configuration conf = new Configuration(); conf.set("fs.defaultFS", AFS_URL); List filePathList = new ArrayList(); try { fileSystem = FileSystem.get(conf); System.out.println("filePath:" + path); getFilesInPath(fileSystem, filePathList, path); } catch (IOException e) { e.printStackTrace(); } Dataset stringDataset = null; for (Path filePath : filePathList) { String sulotionPath = filePath.toString(); stringDataset = stringDataset == null ? spark.read().text(sulotionPath) : stringDataset.union(spark.read().text(sulotionPath)); } return stringDataset; } /** * 递归获取指定目录下的所有文件绝对路径 */ private void getFilesInPath(FileSystem fileSystem, List filesInPath, String filePath) throws IOException { Path path = new Path(filePath); FileStatus[] files = fileSystem.listStatus(path); for (FileStatus file : files) { if (file.isDirectory()) { getFilesInPath(fileSystem, filesInPath, file.getPath().toString()); } else { // 不解析文件名带有@符号的 if (file.getPath().getName().contains("@")) { continue; } System.out.println(file.getPath().toString()); filesInPath.add(file.getPath()); } } } 复制代码

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3