2017-06-16 55 views
0

我想使用2 RDD之间的连接方法并将其保存到cassandra但我的代码不起作用。在开始时,我得到了一个巨大的Main方法,一切运行良好,但是当我使用函数和类时,这不起作用。我是新来斯卡拉和火花斯卡拉/ Spark可串行化错误 - 加入不起作用

代码:

class Migration extends Serializable { 

    case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable 
    case class siteExternalId(site_external_id: Option[String]) extends Serializable 
    case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable 

    def SparkMigrationProfile(sc: SparkContext) = { 

    val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE) 
    .keyBy[userId] 
    .filter(x => x._2.site_external_id != None) 

    val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE) 
    .keyBy[userId] 

    //dont work 
    test.join(profileRDD) 
    .foreach(println) 

    // don't work 
    test.join(profileRDD) 
    .saveToCassandra(keyspace, table) 

    } 

在beginig我得到了著名:线程“main” org.apache.spark.SparkException例外:在任务不能序列。 。 。 所以我扩展我的主类和案例类,但stil不工作。

回答

0

我认为你应该将你的案例类从Migration类移到专用文件和/或对象。这应该可以解决你的问题。另外,Scala案例类默认是可序列化的。

+0

它的工作!我现在变得如此愚蠢。 。 。 你能向我解释为什么? – user3394825

+0

hi @ user3394825,很难说,因为我没有在Cassandra中使用Spark。根据我的经验,当使用其他类中定义的案例类时,我遇到了类似的问题。在你的情况下,为'cassandraTable'函数创建隐式参数可能会有一些问题(https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/ com/datastax/spark/connector/SparkContextFunctions.scala)例如'rrf:RowReaderFactory [T], ev:ValidRDDType [T]',但我只是猜测。我知道当使用Spark SQL Encoder时,也有类似的例外。 –

+0

案例类在技术上可以访问封装的迁移实例的内部类。当它们被序列化时,附带的迁移对象也会被序列化。即使它被标记为可序列化,但其中可能有某个实例变量不在其中。通常罪魁祸首是一个SparkContext对象。 –