2017-04-06 69 views
4

我有以下内容输入文件foo.txt斯卡拉 - 星火(version1.5.2)Dataframes分裂错误

c1|c2|c3|c4|c5|c6|c7|c8| 
00| |1.0|1.0|9|27.0|0|| 
01|2|3.0|4.0|1|10.0|1|1| 

我想将它转换成一个Dataframe执行一些Sql查询:

var text = sc.textFile("foo.txt") 
var header = text.first() 
var rdd = text.filter(row => row != header) 
case class Data(c1: String, c2: String, c3: String, c4: String, c5: String, c6: String, c7: String, c8: String) 

直到此时一切正常,问题就来了下一句:

var df = rdd.map(_.split("\\|")).map(p => Data(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7))).toDF() 

如果我尝试打印dfdf.show,我得到一个错误信息:

scala> df.show() 
java.lang.ArrayIndexOutOfBoundsException: 7 

我知道错误可能是由于拆分句子。我也尝试使用下面的语法分裂foo.txt

var df = rdd.map(_.split("""|""")).map(p => Data(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7))).toDF() 

然后我得到的东西是这样的:

scala> df.show() 
+------+---------+----------+-----------+-----+-----------+----------------+----------------+ 
| c1 |  c2 | c3 |  c4 | c5 |  c6 |  c7  |  c8  | 
+------+---------+----------+-----------+-----+-----------+----------------+----------------+ 
|  0|  0|   ||   | ||   1|    .|    0| 
|  0|  1|   ||   2| ||   3|    .|    0| 
+------+---------+----------+-----------+-----+-----------+----------------+----------------+ 

因此,我的问题是我怎么能正确此文件传递到数据帧。

编辑:错误是在第一行由于||字段没有中间空间。这种类型的字段定义取决于示例工作正常或崩溃。

+1

我不能重现错误,在Spark 2.0上它工作正常。 (在df.show()上没有ArrayIndexOutOfBoundsException) – jamborta

+0

对不起,我忘了评论它是在Spark 1.5.2上。我将编辑问题 – qwerty

+0

您不能将文件重命名为'.csv',并直接将其读入'df'中? – pheeleeppoo

回答

3

这是因为您的一条线路是比别人更短:

scala> var df = rdd.map(_.split("\\|")).map(_.length).collect() 
df: Array[Int] = Array(7, 8) 

您可以手动行填写(但你需要手动处理各种情况下):

val df = rdd.map(_.split("\\|")).map{row => 
    row match { 
    case Array(a,b,c,d,e,f,g,h) => Data(a,b,c,d,e,f,g,h) 
    case Array(a,b,c,d,e,f,g) => Data(a,b,c,d,e,f,g," ") 
    } 
} 

scala> df.show() 
+---+---+---+---+---+----+---+---+ 
| c1| c2| c3| c4| c5| c6| c7| c8| 
+---+---+---+---+---+----+---+---+ 
| 00| |1.0|1.0| 9|27.0| 0| | 
| 01| 2|3.0|4.0| 1|10.0| 1| 1| 
+---+---+---+---+---+----+---+---+ 

编辑:

一个更通用的解决办法是这样的:

val df = rdd.map(_.split("\\|", -1)).map(_.slice(0,8)).map(p => Data(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7))).toDF() 

如果您认为始终有适当数量的分隔符,则使用此语法可以安全地截断最后一个值。

+0

第二个片段是我正在寻找的东西,但我错过了一些东西。在这个例子中,空字段在最后一个字段,但想象它可能在8列中的任何一列。考虑到这种叠加,你如何推广你的代码以正确工作? – qwerty

+0

为什么包含'.map(_。slice(0,8))'? – qwerty

+1

只是为了表明它更长。如果你只是想把它提供给案例课,这是没有必要的。 – jamborta

2

我的建议是使用databrick的csv解析器。

链接:https://github.com/databricks/spark-csv

来加载例如:

我装与你相似的示例文件:

c1|c2|c3|c4|c5|c6|c7|c8| 
00| |1.0|1.0|9|27.0|0|| 
01|2|3.0|4.0|1|10.0|1|1| 

要创建数据框使用下面的代码:

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") // Use first line of all files as header 
    .option("inferSchema", "true") // Automatically infer data types 
    .option("delimiter", "|") // default is "," 
    .load("foo.txt") 
    .show 

我得到了下面的输出

+---+---+---+---+---+----+---+----+---+ 
| c1| c2| c3| c4| c5| c6| c7| c8| | 
+---+---+---+---+---+----+---+----+---+ 
| 0| |1.0|1.0| 9|27.0| 0|null| | 
| 1| 2|3.0|4.0| 1|10.0| 1| 1| | 
+---+---+---+---+---+----+---+----+---+ 

这样你就不必费心自己解析文件了。你直接得到一个数据帧