2016-12-05 141 views
0

对不起,但我需要再次提出问题。我希望这一个不重复。我编辑了last one,但我认为没人看到编辑过的版本。这是问题的一个简单的例子:Spark DataFrame映射错误

val spark = SparkSession 
.builder() 
.appName("test") 
.getOrCreate() 

val field = StructField("1", BooleanType, false) 
val schema = StructType(field::Nil) 
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false))) 
val df = spark.createDataFrame(rowRDD, schema) 

val new_df = //Add hundred of new columns 

//here is the error 
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil) 

错误:

error: Unable to find encoder for type stored in a Dataset. 
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ 
Support for serializing other types will be added in future releases. 

我想要做的是,要修改的每一行。在这种情况下,我知道,只有1列,我可以像Encoder error while trying to map dataframe row to updated row一样处理它。 但是,如果我有数百列的话,我该如何解决这个问题? 我想删除一些行,如果他们不满足条件。 目前我使用:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil) 

但是,我认为,这是最好的解决。求助

Exception in thread "main" java.lang.StackOverflowError 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) 

TY :)

+0

我想使用DF,因为我需要架构。 DF有没有地图相似的功能?我想在某些条件下删除或扩展DF中的某一行。 –

回答

0

添加新列的整个数据集将工作withColumn()选项:我也是在的StackOverflowError运行。如果你有更多的专栏,它会让事情变得更糟。 您可以使用Spark SQL并在SQL样式中添加查询以添加新列。这将需要更多的SQL技能,而不仅仅是火花。和100列,可能是维护将是艰难的。

您可以按照另一种方法。

您可以将rdd转换为数据框。然后在数据框上使用映射并按照您的意愿处理每一行。里面的地图法,

a。您可以根据计算收集新值。

b。添加这些新的列值到主RDD如下

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

这里行,是行的地图方法

c中的参考。如下创建新模式

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d。添加到旧模式

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e。使用新列创建新数据帧

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

Thnx为您的答案,但我怎么能使用DataFrame上的map()与很多列?我收到上面的错误。我所有的列都是布尔值。 –

+0

您提到的问题是向现有数据框添加新列和更多列。所以上述步骤将有助于 – Ramzy

+0

问题是,如何使用map()和数百列而不会出现错误? –