在我的程序中,我有一种方法返回一些RDD,我们称之为myMethod
,它采用不可序列化的参数,并让RDD的类型为Long
(我的真正的RDD是元组类型,但只包含原始类型)。奇怪的“任务不可序列化”与星火
当我尝试这样的事:
val x: NonSerializableThing = ...
val l: Long = ...
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing
我得到Task not serializable
。
当我用res + 1L
代替res + l
(即,某个常数)时,它运行。
从序列化跟踪中,它试图序列化NonSerializableThing
和扼流器,但是我重新检查了我的方法,并且此对象从不出现在RDD中。
当我尝试直接收集myMethod
输出,即与
myMethod(x, l).take(1) foreach println
我也拿不出问题。
该方法使用NonSerializableThing
获得上多个卡桑德拉查询由值的(本地)序列(这是必要的,因为我需要构造分区键来查询),像这样:
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val someParam1: String = x.someProperty
x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
val someParam2: String = y.someOtherProperty
y.someOtherSeq.map(someParam3: String =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
map(_.getLong(0))
}.reduce((a, b) => a.union(b))
}
的getSomeSeq
和someOtherSeq
回归平淡无火花Seq
小号
我想实现的是“联盟”多卡珊德拉查询。
这里有什么问题?
编辑,编,由杰姆·塔克的要求:
我有什么在我的课是这样的:
implicit class MySparkExtension(sc: SparkContext) {
def getThing(/* some parameters */): NonSerializableThing = { ... }
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val someParam1: String = x.someProperty
x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
val someParam2: String = y.someOtherProperty
y.someOtherSeq.map(someParam3: String =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
map(_.getLong(0))
}.reduce((a, b) => a.union(b))
}
}
这包对象被声明。问题occurrs这里:
// SparkContext is already declared as sc
import my.pkg.with.extension._
val thing = sc.getThing(/* parameters */)
val l = 42L
val rdd = sc.myMethod(thing, l)
// until now, everything is OK.
// The following still works:
rdd.take(5) foreach println
// The following causes the exception:
rdd.map(x => x >= l).take(5) foreach println
// While the following works:
rdd.map(x => x >= 42L).take(5) foreach println
我测试了进入“现场”成星火外壳以及在通过提交的算法。
我现在想尝试(按我最后的评论)如下:
implicit class MySparkExtension(sc: SparkContext) {
def getThing(/* some parameters */): NonSerializableThing = { ... }
def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
val param1 = x.someProperty
val partitionKeys =
x.getSomeSeq.flatMap(y => {
val param2 = y.someOtherProperty
y.someOtherSeq.map(param3 => (param1, param2, param3, l)
}
queryTheDatabase(partitionKeys)
}
private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = {
partitionKeys.map(k =>
sc.cassandraTable("fooKeyspace", "fooTable").
select("foo").
where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4).
map(_.getLong(0))
).reduce((a, b) => a.union(b))
}
}
我相信这可能是工作,因为RDD在方法queryTheDatabase
现在,这里不存在NonSerializableThing
构建。
另一种选择可能是:NonSerializableThing
确实是可序列化的,但我传入SparkContext
作为隐含的构造函数参数。我认为如果我做这个暂时的,它会(无用)被序列化,但不会造成任何问题。
Plz post'mymethod'或至少是它的签名。 –
我刚刚做到了。 – rabejens
我仍然看不到'def mymethod(...)...'。 2.你的客体在哪里生活,他们的背景是什么? –