2015-10-16 69 views
0

在我的程序中,我有一种方法返回一些RDD,我们称之为myMethod,它采用不可序列化的参数,并让RDD的类型为Long(我的真正的RDD是元组类型,但只包含原始类型)。奇怪的“任务不可序列化”与星火

当我尝试这样的事:

val x: NonSerializableThing = ... 
val l: Long = ... 
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing 

我得到Task not serializable

当我用res + 1L代替res + l(即,某个常数)时,它运行。

从序列化跟踪中,它试图序列化NonSerializableThing和扼流器,但是我重新检查了我的方法,并且此对象从不出现在RDD中。

当我尝试直接收集myMethod输出,即与

myMethod(x, l).take(1) foreach println 

我也拿不出问题。

该方法使用NonSerializableThing获得上多个卡桑德拉查询由值的(本地)序列(这是必要的,因为我需要构造分区键来查询),像这样:

def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = { 
    val someParam1: String = x.someProperty 
    x.getSomeSeq.flatMap(y: OtherNonSerializableThing => { 
    val someParam2: String = y.someOtherProperty 
    y.someOtherSeq.map(someParam3: String => 
     sc.cassandraTable("fooKeyspace", "fooTable"). 
     select("foo"). 
     where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l). 
     map(_.getLong(0)) 
    }.reduce((a, b) => a.union(b)) 
} 

getSomeSeqsomeOtherSeq回归平淡无火花Seq小号

我想实现的是“联盟”多卡珊德拉查询。

这里有什么问题?

编辑,编,由杰姆·塔克的要求:

我有什么在我的课是这样的:

implicit class MySparkExtension(sc: SparkContext) { 

    def getThing(/* some parameters */): NonSerializableThing = { ... } 

    def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = { 
    val someParam1: String = x.someProperty 
    x.getSomeSeq.flatMap(y: OtherNonSerializableThing => { 
     val someParam2: String = y.someOtherProperty 
     y.someOtherSeq.map(someParam3: String => 
     sc.cassandraTable("fooKeyspace", "fooTable"). 
     select("foo"). 
     where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l). 
     map(_.getLong(0)) 
    }.reduce((a, b) => a.union(b)) 
    } 
} 

这包对象被声明。问题occurrs这里:

// SparkContext is already declared as sc 
import my.pkg.with.extension._ 

val thing = sc.getThing(/* parameters */) 
val l = 42L 
val rdd = sc.myMethod(thing, l) 
// until now, everything is OK. 
// The following still works: 
rdd.take(5) foreach println 
// The following causes the exception: 
rdd.map(x => x >= l).take(5) foreach println 
// While the following works: 
rdd.map(x => x >= 42L).take(5) foreach println 

我测试了进入“现场”成星火外壳以及在通过​​提交的算法。

我现在想尝试(按我最后的评论)如下:

implicit class MySparkExtension(sc: SparkContext) { 

    def getThing(/* some parameters */): NonSerializableThing = { ... } 

    def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = { 
    val param1 = x.someProperty 
    val partitionKeys = 
     x.getSomeSeq.flatMap(y => { 
     val param2 = y.someOtherProperty 
     y.someOtherSeq.map(param3 => (param1, param2, param3, l) 
     } 
    queryTheDatabase(partitionKeys) 
    } 

    private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = { 
    partitionKeys.map(k => 
     sc.cassandraTable("fooKeyspace", "fooTable"). 
     select("foo"). 
     where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4). 
     map(_.getLong(0)) 
    ).reduce((a, b) => a.union(b)) 
    } 
} 

我相信这可能是工作,因为RDD在方法queryTheDatabase现在,这里不存在NonSerializableThing构建。

另一种选择可能是:NonSerializableThing确实是可序列化的,但我传入SparkContext作为隐含的构造函数参数。我认为如果我做这个暂时的,它会(无用)被序列化,但不会造成任何问题。

+0

Plz post'mymethod'或至少是它的签名。 –

+0

我刚刚做到了。 – rabejens

+0

我仍然看不到'def mymethod(...)...'。 2.你的客体在哪里生活,他们的背景是什么? –

回答

1

当您将l替换为1L Spark不再尝试使用in中的方法/变量序列化类,因此不会抛出错误。

您应该能够通过将val x: NonSerializableThing = ...标记为瞬态即可修复。

@transient 
val x: NonSerializableThing = ... 

这意味着当类被序列化时,这个变量应该被忽略。

+0

当通过参数传入时,是否也可以将'x:NonSerialiyableThing'声明为transient?或者,如果我使用'@transient val x1:NonSerializableThing = x'并从此使用'x1'就足够了? – rabejens

+0

如果你的意思是作为参数传入类构造器,那么是的。你能发布包含此代码的完整类def吗? –

+0

不幸的是,我不能,因为我不允许发布公司代码。我将在明天尝试以下内容:构造包含所有分区键(仅包含字符串,长整型等)的'Seq',并将此(仅此)传递给在集群上执行Cassandra查询的私有方法。我认为这可能是一个可行的解决方法,因为在构建RDD时,范围中不存在NonSerializableThing。 – rabejens