2016-08-11 60 views
0

我正在使用Spark 1.5.2从scala对象使用以下语法创建数据框。我的目的是为单元测试创​​建一个数据。Spark:SQL上下文:从Scala对象创建数据框

class Address (first:String = null, second: String = null, zip: String = null){} 
class Person (id: String = null, name: String = null, address: Seq[Address] = null){} 

def test() = { 

    val sqlContext = new SQLContext(sc) 

    import sqlContext.implicits._ 

    val persons = Seq(
    new Person(id = "1", name = "Salim", 
     address = Seq(new Address(first = "1st street"))), 
    new Person(name = "Sana", 
     address = Seq(new Address(zip = "60088"))) 
) 

    // The code can't infer schema automatically 
    val claimDF = sqlContext.createDataFrame(sc.parallelize(persons, 2),classOf[Person]) 

    claimDF.printSchema() // This prints "root" not the schema of Person. 
} 

相反,如果我转换人事和地址,以案例类,然后星火可以自动使用上述语法或使用sc.parallelize(persons, 2).toDF或使用sqlContext.createDataFrame(sc.parallelize(persons, 2),StructType)

我可以继承模式因为它不能容纳20个以上的字段,所以我们有很多字段。使用StructType会带来很多不便。案例类最方便,但不能容纳太多的属性。

请帮助,在此先感谢。

+0

我认为,如果你的类扩展[产品特点](http://www.scala-lang.org/api/2.10.6/#scala.Product),并实施其抽象方法它可能工作。 (由于这个签名:'createDataFrame [A <:Product](data:Seq [A])') –

回答

0

对代码进行两处更改将使printSchema()发出数据框的完整结构而不使用大小写类。

首先,丹尼尔建议,你需要对你的类扩展scala.Product特质(痛苦的,但需要以下.toDF法):

class Address (first:String = null, second: String = null, zip: String = null) extends Product with Serializable 
{ 
    override def canEqual(that: Any) = that.isInstanceOf[Address] 
    override def productArity: Int = 3 
    def productElement(n: Int) = n match { 
    case 0 => first; case 1 => second; case 2 => zip 
    } 
} 

class Person (id: String = null, name: String = null, address: Seq[Address] = null) extends Product with Serializable 
{ 
    override def canEqual(that: Any) = that.isInstanceOf[Person] 
    override def productArity: Int = 3 
    def productElement(n: Int) = n match { 
    case 0 => id; case 1 => name; case 2 => address 
    } 
} 

其次,你应该创建一个使用.toDF您的数据帧被使得与范围import sqlContext.implicits._而不是使用sqlContext.createDataFrame(..)像这样隐式方法:

val claimDF = sc.parallelize(persons, 2).toDF 

然后claimDF.printSchema()将打印:

root 
|-- id: string (nullable = true) 
|-- name: string (nullable = true) 
|-- address: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- first: string (nullable = true) 
| | |-- second: string (nullable = true) 
| | |-- zip: string (nullable = true) 

或者,您可以使用Scala 2.11.0-M3删除案例类别的22个字段限制。

1

非常感谢您的意见。

我们最终迁移到支持更大案例类的Scala 2.11的Spark 2.1,以便解决此问题。

对于Spark 1.6和Scala 2.10,我最终构建了Row对象和Struct类型来构建一个Dataframe。

val rows = Seq(Row("data")) 
val aRDD = sc.parallelize(rows) 
val aDF = sqlContext.createDataFrame(aRDD,getSchema()) 

def getSchema(): StructType= { 
    StructType(
     Array(
      StructField("jobNumber", StringType, nullable = true)) 
    ) 
}