2017-06-21 58 views
0

我知道有很多关于这个问题,但我没能找到究竟需要可序列化系统的解释(以及何时序列化)......以及如何验证此要求。星火序列化(?)与特质古怪和差异1.6.0之间昂2.1.1

考虑一下:

class Baz 
trait Bar { val baz = new Baz; def bar(i: Int) = baz } 
case object Foo extends Bar { def foo = sc.parallelize(1 to 1).map(bar).collect } 
Foo.foo 

这工作,并返回Array(null) 是否有意义给任何人???

如果我改变vallazy val,比停止工作,并抛出NotSerializableException,这是有道理的 - 它初始化baz在远程端,然后无法把它送回来。 但是,为什么它在第一种情况下愉快地用null替代?

如果我写它,好看多了,我能想到的任何其他方式 - 从特质移动bar定义对象,例如,或者_ => baz替换bar调用 - 它也停止工作,并抱怨说Task is not serializable

与返回的是一个特征定义的VAL的方法,这使得它只是把它写为null,而不是这是什么?有任何想法吗?

UPDATE 上述行为发生在scala 2.11和spark 2.1.1上。 斯卡拉2.10(火花1.6.0)不会抛出异常,抱怨Baz不可序列化......所以,这似乎是一种倒退。

我注意到,在火花1.6.0,像这样工作得很好:

object Foo { def foo = sc.parallelize(1 to 1).map(bar).collect; def bar(i: Int) = i+1 } 
    Foo.foo 

但在火花2.1.1它抱怨说Foo是不可序列。这是为什么? 显然,序列化拉姆达想也序列化Foo,其中排序有道理......但它确实工作在1.6.0不知何故,即使我做labda实际引用其他的事情Foo

object Foo { 
    var stuff = 10 
    def foo = sc.parallelize(1 to 1).map(bar).collect 
    def bar(i: Int) = { stuff += 1; i+1 } 
    } 
    Foo.foo 
    Foo.stuff 

这工作正常1.6.0,而不是在2.1.1。

所以,这里的一个问题是它如何在实际工作1.6.0?我的意思是,Foo不是可序列化的,它怎么知道另一端的stuff的值?

另外,显而易见的,问题是 - 为什么它停止工作2.1.1? 1.6.0行为是否存在微妙的问题,我们是否应该不依赖它? 或者它只是2.1.1中的一个错误?

回答

0

这可能不是一个意义上的直接的答案,这将使你在背后星火,为什么你的使用情况将可能被以这种方式工作或其他的序列化问题的确切理由,但...让我提供一些线索在这。

SparkContext是一切发生的地方(或者至少在它开始的地方)。在这些方法中,你可以找到clean方法:

private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { 
    ClosureCleaner.clean(f, checkSerializable) 
    f 
} 

引用其scaladoc你应该提供有关星火怎么做的系列化验证足够的信息:

清洁封闭,使其准备序列化和发送(除去$ outer's中未引用的变量,更新REPL变量)

如果设置了checkSerializableclean还将主动检查f是否可序列化,并抛出SparkException如果不是。

搜索所有使用该方法的地方可能会使您的代码可能会或可能无法正常工作。这与将clean方法应用于您的代码一样简单。

你也可以开始与RDD.map算哪里找到clean方法:

val cleanF = sc.clean(f) 

这样,您可以建立的,为什么你的代码可以给每个你是否使用vallazy val不同的结果有一些了解。

我认为,在末尾,如下所示的代码可以改写:

// run spark-shell -c spark.driver.allowMultipleContexts=true 
// use :paste -raw 
package org.apache.spark 

class Baz 
trait Bar { lazy val baz = new Baz; def bar(i: Int) = baz } 
case object Foo extends Bar { 
    val sc = new SparkContext("local[*]", "Clean", new SparkConf) 
    def foo = sc.clean(bar _) 
} 

// org.apache.spark.Foo.foo 

我用spark-shell,这和它看来,代码工作正常与不Spark中2.3.0- lazy关键字SNAPSHOT今天建成。

+0

实际上,我对这些不良的唯一理解就是'val'和'lazy val'情况下的结果不同:) 其他一切似乎都像一大堆......神秘:) – Dima

+0

此外,忘了提及:你写的代码在使用和不使用“懒惰”的情况下的工作方式都是一样的,但它与我的(第一个)代码片段没有做同样的事情。如果你把'foo'改成'sc.parallelize(1 to 1).map(sc.clean(bar))。collect',那么它会以懒惰(这是可以理解的)抛出,但是“工作”并返回一个没有懒惰的_null_数组,这只是没有任何意义。 – Dima

相关问题