2016-12-20 61 views
3

我遇到了Spark Scala脚本的一个小问题。基本上我有原始数据,我正在做分组和计数等聚合后,我想将输出保存为特定的JSON格式。Spark Dataframe架构定义使用带案例类和列名别名的反射

编辑:

我试图简化的问题,并改写它:

当我选择与Array[org.apache.spark.sql.Column]其中的列名有别名,然后使用列名从源数据帧的数据(或当试图将行映射到case类时,我得到一个“Task not serializable”异常。

var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name") 

val cl = dm.columns 
val cl2 = cl.map(name => col(name).as(name.capitalize)) 
val dm2 = dm.select(cl2:_*) 
val n = "Name" 
case class Result(Name:String) 
val r = dm2.map(row => Result(row.getAs(n))).toDF 

,第二部分或问题,我实际需要的最终模式是这些Result类对象的数组。我还没有想出,如何做到这一点。预期的结果应该有一个这样的模式:

case class Test(var FilteredStatistics: Array[Result]) 
    val t = Test(Array(Result("Anna"), Result("James"))) 

    val t2 = sc.parallelize(Seq(t)).toDF 

    scala> t2.printSchema 
    root 
    |-- FilteredStatistics: array (nullable = true) 
    | |-- element: struct (containsNull = true) 
    | | |-- Name: string (nullable = true) 

TL; DR

  1. 如何数据帧行映射到一个案例类对象时,数据框列有别名和变量用于列名?

  2. 如何将这些case类对象添加到数组?

+0

序列化问题没有重现 - 我复制了所有的代码,它适用于我。看起来像代码中的某个地方(不粘贴在这里?),您正在使用DataFrame中使用的Case类中的org.apache.spark.sql.Column对象,或者在序列化并发送给工作人员的转换中使用... –

+2

顺便说一句 - 我们中的一个可能会迷失在这个非常复杂的问题的细节中......尝试最小化它(很多) - 找到再现问题的最简单的例子(在类似的最小化后分别询问另一个问题) –

+0

one修复尝试你的序列化问题...... class Result(???)extends Serializable;对象结果{def apply(r:Row):结果= r匹配{??? }}然后在r上使用模式匹配来处理你在DF中可能有的各种格式。当你试图将一个类应用到行的一部分时,通常会遇到问题,但是如果创建了一个可以映射整行的类......那么'DF.map(Result)'可能会起作用。 – kmh

回答

0

序列化问题:这里的问题是val n = "Name":它被传递到RDD变换(dm2.map(...))一个匿名函数,这使得火花接近超过该变量和含有它范围内使用,其中还包括cl2,其类型为Array[Column],因此它不是可序列化的。

解决方案很简单 - 要么内嵌n(要获得dm2.map(row => Result(row.getAs("Name")))),要么将其置于Serializable上下文(一个对象或不包含任何不可序列化成员的类)中。

+0

好的,我仍然有一些困难。我不想这样做,因为我在一个地方声明了列名,然后在整个代码中使用它们。我不想复制它们,因为那样我就不得不在多个地方改变它们。但是你是否意味着我应该创建一个对象或类来保存必要的变量?我尝试了'对象Params extends Serializable {val name =“Name”}'和'case class Params(name:String)'(案例类应该是默认可序列化的)并且在getAs中使用'Params.name',没有帮助。对不起,如果我错过了明显的东西 –

+0

如果对象没有嵌套在任何其他类 - 它应该工作(为我工作...)。请注意,当您在* spark-shell *中创建对象时,它们实际上嵌套在封装整个shell代码的对象中,所以它可能无法在那里工作(我认为) –