2014-10-31 54 views
1

对于以下RDD在星火定制RDD(显示相关部分):调用收集()上产生一个空的集合

val myRdd = new RDD[RddOutput](zippedRows) { 

    override def compute(split: Partition, context: TaskContext): Iterator[RddOutput] = { 
     .. 
     val out = // computes a list of items 
    } 
    out.toIterator // Breakpoint set here: out is non-empty 
    } 

} 

当调用RDD:

val outVects = myRdd.collect 
val veclen = outVects(0).size // outVects is null! 

因此,作为评论音符:compute()中的输出迭代器非空,但是没有从collect()调用返回的数据。有任何想法吗?

+0

您是否真的需要创建一个新的RDD? zippedRows.map/mapPartitions足够了吗? – zsxwing 2014-10-31 10:03:06

+0

@zsxwing广播正在推动这一选择。如果/当这个问题得到解决,自定义RDD将不是必需的。 – javadba 2014-10-31 13:50:33

+0

没有我们可以运行的例子,很难说出任何事情。这适用于我:'new RDD [Int](sc.parallelize(1 to 10)){override def compute(split:Partition,context:TaskContext)= Seq(1,2).iterator;重写def getPartitions = firstParent [Int] .partitions} .collect'。 – 2014-10-31 16:06:16

回答

0

此问题似乎与不一致的分区方案有关。 MyRdd被赋予与父项相同的分区。然后我添加了一个转换步骤,但没有更新分区以指向新的父节点:分区指向新的祖父节点RDD。