在spark scala中动态创建CSV文件头

您所在的位置:网站首页 scala计算1-100的和 在spark scala中动态创建CSV文件头

在spark scala中动态创建CSV文件头

2023-05-05 06:28| 来源: 网络整理| 查看: 265

在这种情况下,您不能使用CSV数据源,但是您可以自己解析它,不会有太多困难。基本步骤是读取元数据文件作为一个文本blob,然后数据文件将数据行与标题行分开,并应用架构。

我对你的分隔符做了一些假设。要记住的一件事是,一旦加载了DataFrame,就不应该对它的行进行任何排序,因为您不知道在哪个worker上加载了什么数据。这会尝试根据其内容来识别数据与元数据行,因此您可能需要调整这些规则。

def loadVendor(dataPath: String, metaPath: String): DataFrame = { val df = spark.read.text(path) // First read the metadata file, wholeTextFiles lets us get it all // as a single string we so we can parse locally val metaText = sc.wholeTextFiles(metaPath).first._2 val metaLines = metaText. split("\n"). map(_.split(",")) // Identify header as line that has all the field names val fields = Seq("user_id", "has_insurance", "postal_code", "city") val headerDf = fields.foldLeft(df)((df2, fname) => df2.filter($"value".contains(fname))) val headerLine = headerDf.first.getString(0) val header = headerLine.split(" ") // Identify data rows as ones that aren't special lines or the header var data = df. filter($"value" =!= headerLine). filter(!$"value".startsWith("Vendor 1")). filter(!$"value".startsWith("data file format")) // Split the data fields on separator and assign column names. Assumed any whitespace is your separator val rows = data.select(split($"value", raw"\W+") as "fields") val named = header.zipWithIndex.map( { case (f, idx) => $"fields".getItem(idx).alias(f)} ) val table = rows.select(named:_*) // Cast to the right types val castCols = metaLines.map { case Array(cname, ctype) => col(cname).cast(ctype) } val typed = table.select(castCols:_*) // Return the columns sorted by name, so union'ing multiple DF's will line up typed.select(table.columns.sorted.map(col):_*) }

这是打印出来的数据

scala> df.show +-------+-------------+-----------+-------+ | city|has_insurance|postal_code|user_id| +-------+-------------+-----------+-------+ |Newyork| true| 20001| 101| | Boston| false| 40001| 102| +-------+-------------+-----------+-------+

这是打印出来的模式

scala> df.printSchema root |-- city: string (nullable = true) |-- has_insurance: boolean (nullable = true) |-- postal_code: string (nullable = true) |-- user_id: string (nullable = true)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3