对不起,但我需要再次提出问题。我希望这一个不重复。我编辑了last one,但我认为没人看到编辑过的版本。这是问题的一个简单的例子:Spark DataFrame映射错误
val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()
val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)
val new_df = //Add hundred of new columns
//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)
错误:
error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
我想要做的是,要修改的每一行。在这种情况下,我知道,只有1列,我可以像Encoder error while trying to map dataframe row to updated row一样处理它。 但是,如果我有数百列的话,我该如何解决这个问题? 我想删除一些行,如果他们不满足条件。 目前我使用:
val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)
但是,我认为,这是最好的解决。求助
Exception in thread "main" java.lang.StackOverflowError
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
TY :)
我想使用DF,因为我需要架构。 DF有没有地图相似的功能?我想在某些条件下删除或扩展DF中的某一行。 –