我遇到了Spark Scala脚本的一个小问题。基本上我有原始数据,我正在做分组和计数等聚合后,我想将输出保存为特定的JSON格式。Spark Dataframe架构定义使用带案例类和列名别名的反射
编辑:
我试图简化的问题,并改写它:
当我选择与Array[org.apache.spark.sql.Column]
其中的列名有别名,然后使用列名从源数据帧的数据(或当试图将行映射到case类时,我得到一个“Task not serializable”异常。
var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")
val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF
,第二部分或问题,我实际需要的最终模式是这些Result
类对象的数组。我还没有想出,如何做到这一点。预期的结果应该有一个这样的模式:
case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))
val t2 = sc.parallelize(Seq(t)).toDF
scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
TL; DR:
如何数据帧行映射到一个案例类对象时,数据框列有别名和变量用于列名?
如何将这些case类对象添加到数组?
序列化问题没有重现 - 我复制了所有的代码,它适用于我。看起来像代码中的某个地方(不粘贴在这里?),您正在使用DataFrame中使用的Case类中的org.apache.spark.sql.Column对象,或者在序列化并发送给工作人员的转换中使用... –
顺便说一句 - 我们中的一个可能会迷失在这个非常复杂的问题的细节中......尝试最小化它(很多) - 找到再现问题的最简单的例子(在类似的最小化后分别询问另一个问题) –
one修复尝试你的序列化问题...... class Result(???)extends Serializable;对象结果{def apply(r:Row):结果= r匹配{??? }}然后在r上使用模式匹配来处理你在DF中可能有的各种格式。当你试图将一个类应用到行的一部分时,通常会遇到问题,但是如果创建了一个可以映射整行的类......那么'DF.map(Result)'可能会起作用。 – kmh