2017-10-21 98 views
5

假设我们有以下的文本文件(df.show()命令的输出):如何将show操作符的输出读回数据集?

+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1|pi number|3.141592| 
| 2| e number| 2.71828| 
+----+---------+--------+ 

现在我想读/解析它作为一个数据帧/数据集。什么是最闪亮的方式来做到这一点?

p.s.我感兴趣的解决方案scalapyspark,这就是为什么使用这两个标签。

回答

4

UPDATE:使用 “单义性” 解析器的lib我可以摆脱一个行,其中我在列名删除空格的:

斯卡拉:

// read Spark Output Fixed width table: 
def readSparkOutput(filePath: String) : org.apache.spark.sql.DataFrame = { 
    val t = spark.read 
       .option("header","true") 
       .option("inferSchema","true") 
       .option("delimiter","|") 
       .option("parserLib","UNIVOCITY") 
       .option("ignoreLeadingWhiteSpace","true") 
       .option("ignoreTrailingWhiteSpace","true") 
       .option("comment","+") 
       .csv(filePath) 
    t.select(t.columns.filterNot(_.startsWith("_c")).map(t(_)):_*) 
} 

PySpark:

def read_spark_output(file_path): 
    t = spark.read \ 
      .option("header","true") \ 
      .option("inferSchema","true") \ 
      .option("delimiter","|") \ 
      .option("parserLib","UNIVOCITY") \ 
      .option("ignoreLeadingWhiteSpace","true") \ 
      .option("ignoreTrailingWhiteSpace","true") \ 
      .option("comment","+") \ 
      .csv("file:///tmp/spark.out") 
    # select not-null columns 
    return t.select([c for c in t.columns if not c.startswith("_")]) 

使用示例:

scala> val df = readSparkOutput("file:///tmp/spark.out") 
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string ... 1 more field] 

scala> df.show 
+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1|pi number|3.141592| 
| 2| e number| 2.71828| 
+----+---------+--------+ 


scala> df.printSchema 
root 
|-- col1: integer (nullable = true) 
|-- col2: string (nullable = true) 
|-- col3: double (nullable = true) 

老答案:

这是我在斯卡拉尝试(星火2.2):

// read Spark Output Fixed width table: 
val t = spark.read 
    .option("header","true") 
    .option("inferSchema","true") 
    .option("delimiter","|") 
    .option("comment","+") 
    .csv("file:///temp/spark.out") 
// select not-null columns 
val cols = t.columns.filterNot(c => c.startsWith("_c")).map(a => t(a)) 
// trim spaces from columns 
val colsTrimmed = t.columns.filterNot(c => c.startsWith("_c")).map(c => c.replaceAll("\\s+","")) 
// reanme columns using 'colsTrimmed' 
val df = t.select(cols:_*).toDF(colsTrimmed:_*) 

它的工作原理,但我有一种感觉,必须有多少更优雅的方式来做到这一点。

scala> df.show 
+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1.0|pi number|3.141592| 
| 2.0| e number| 2.71828| 
+----+---------+--------+ 

scala> df.printSchema 
root 
|-- col1: double (nullable = true) 
|-- col2: string (nullable = true) 
|-- col3: double (nullable = true) 
+0

我一直在想写一个自定义的Spark源码,但是你的解决方案很简单!谢谢。 –

+1

@JacekLaskowski,不,谢谢!我从你的[掌握Apache Spark 2](https://www.gitbook.com/book/jaceklaskowski/mastering-apache-spark/details)以及从你的答案中学到很多东西。 – MaxU