2016-01-23 91 views
4

我试图在Java中创建Dataset,所以我写了下面的代码:如何从自定义类Person创建数据集?

public Dataset createDataset(){ 
    List<Person> list = new ArrayList<>(); 
    list.add(new Person("name", 10, 10.0)); 
    Dataset<Person> dateset = sqlContext.createDataset(list, Encoders.bean(Person.class)); 
    return dataset; 
} 

Person类是一个内部类。然而

星火抛出以下异常:

org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `....` without access to the scope that this class was defined in. Try moving this class out of its parent class.; 

at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:264) 
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:260) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) 
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242) 

如何做正确?

回答

10

TL;博士(仅在火花壳)定义您的情况下的类第一,一旦它们定义,使用它们。在Spark/Scala应用程序中使用案例类应该可行。

2.0.1在Spark shell中,您应该首先定义案例类,然后才能访问它们以创建Dataset

$ ./bin/spark-shell --version 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT 
     /_/ 

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102 
Branch master 
Compiled by user jacek on 2016-10-25T04:20:04Z 
Revision 483c37c581fedc64b218e294ecde1a7bb4b2af9c 
Url https://github.com/apache/spark.git 
Type --help for more information. 

$ ./bin/spark-shell 
scala> :pa 
// Entering paste mode (ctrl-D to finish) 

case class Person(id: Long) 

Seq(Person(0)).toDS // <-- this won't work 

// Exiting paste mode, now interpreting. 

<console>:15: error: value toDS is not a member of Seq[Person] 
     Seq(Person(0)).toDS // <-- it won't work 
        ^
scala> case class Person(id: Long) 
defined class Person 

scala> // the following implicit conversion *will* work 

scala> Seq(Person(0)).toDS 
res1: org.apache.spark.sql.Dataset[Person] = [id: bigint] 

43ebf7a9cbd70d6af75e140a6fc91bf0ffc2b877承诺(在3月21日星火2.0.0-SNAPSHOT)溶液中加入来解决这个问题。

在斯卡拉REPL我不得不添加OuterScopes.addOuterScope(this):paste完整的片段如下:

scala> :pa 
// Entering paste mode (ctrl-D to finish) 

import sqlContext.implicits._ 
case class Token(name: String, productId: Int, score: Double) 
val data = Token("aaa", 100, 0.12) :: 
    Token("aaa", 200, 0.29) :: 
    Token("bbb", 200, 0.53) :: 
    Token("bbb", 300, 0.42) :: Nil 
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 
val ds = data.toDS 
+0

使用[火花笔记本](http://spark-notebook.io)与scala 0.11的确,在case类定义之后并在dataframe命令中使用它之前,添加'org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)'解决了这个问题。 –

+0

我在问addOuterScope方法,如果你知道为什么必须添加编码器才能正常工作 – eliasah

+0

非常感谢您的更新。我曾问过你,因为我在http://stackoverflow.com/a/40232936/3415409之前正在研究这个问题 – eliasah

4

的解决方案是在该方法的开始添加这段代码:

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this); 
0

对于一阶类似的问题,我的解决办法是做完全一样的AnalysisException建议。将案例类移出其父类。 例如我在Streaming_Base.scala类似下面:

abstract class Streaming_Base { 
    case class EventBean(id:String, command:String, recordType:String) 
    ... 
} 

我改变了以下:

case class EventBean(id:String, command:String, recordType:String) 
abstract class Streaming_Base {   
    ... 
} 
相关问题