1
对于以下RDD在星火定制RDD(显示相关部分):调用收集()上产生一个空的集合
val myRdd = new RDD[RddOutput](zippedRows) {
override def compute(split: Partition, context: TaskContext): Iterator[RddOutput] = {
..
val out = // computes a list of items
}
out.toIterator // Breakpoint set here: out is non-empty
}
}
当调用RDD:
val outVects = myRdd.collect
val veclen = outVects(0).size // outVects is null!
因此,作为评论音符:compute()中的输出迭代器非空,但是没有从collect()调用返回的数据。有任何想法吗?
您是否真的需要创建一个新的RDD? zippedRows.map/mapPartitions足够了吗? – zsxwing 2014-10-31 10:03:06
@zsxwing广播正在推动这一选择。如果/当这个问题得到解决,自定义RDD将不是必需的。 – javadba 2014-10-31 13:50:33
没有我们可以运行的例子,很难说出任何事情。这适用于我:'new RDD [Int](sc.parallelize(1 to 10)){override def compute(split:Partition,context:TaskContext)= Seq(1,2).iterator;重写def getPartitions = firstParent [Int] .partitions} .collect'。 – 2014-10-31 16:06:16