2017-05-26 43 views
0

我正在使用scala的/:操作符计算一系列数据集聚合。为聚合的代码如下所示:当我尝试运行了哪些形式传入f参数单独的功能列表块发生Scala中汇总Spark数据集的问题

def execute1( 
xy: DATASET, 
f: Double => Double): Double = { 

println("PRINTING: The data points being evaluated: " + xy) 
println("PRINTING: Running execute1") 

var z = xy.filter{ case(x, y) => abs(y) > EPS} 

var ret = - z./:(0.0) { case(s, (x, y)) => { 
    var px = f(x) 
    s + px*log(px/y)} 
} 

ret 
} 

我的问题。的功能的列表是:

lazy val pdfs = Map[Int, Double => Double](
1 -> betaScaled, 
2 -> gammaScaled, 
3 -> logNormal, 
4 -> uniform, 
5 -> chiSquaredScaled 
) 

穿过列表运行聚合的执行程序功能是:

def execute2( 
xy: DATASET, 
fs: Iterable[Double=>Double]): Iterable[Double] = { 
fs.map(execute1(xy, _)) 
} 

随着最终执行块:

val kl_rdd = master_ds.mapPartitions((it:DATASET) => { 
val pdfsList = pdfs_broadcast.value.map(
    n => pdfs.get(n).get 
) 

execute2(it, pdfsList).iterator 

的问题是,当聚合确实发生,它们似乎都聚集在输出数组的第一个槽中,当我希望分别显示每个功能的聚合时。我跑了测试,以确认所有五个功能实际上正在运行,并且他们正在第一个槽中被总结。

The pre-divergence value: -4.999635700491883 
The pre-divergence value: -0.0 
The pre-divergence value: -0.0 
The pre-divergence value: -0.0 
The pre-divergence value: -0.0 

这是我曾经遇到最困难的问题之一,因此任何方向将不胜感激。将在其应有的地方给予信贷。谢谢!

回答

1

星火的数据集没有foldLeft(又名/:):https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset和实际需要类型参数DataSet[T],它的名字是不是所有的死刑案件。

所以,我想你的DATASET的类型是一个迭代器,所以它在第一次运行execute1后得到耗尽,因此每个后续的execute1都会获得空的迭代器。基本上,它不汇总所有函数 - 它只执行第一个函数并忽略其他函数(因为您将0.0作为初始值传递给foldLeft,所以您会得到-0.0)。

你可以从mapPartitions签名看到:

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] 

它给你一个迭代器(即只能进行一次穿越可变结构),所以你应该为了得到(潜在但限于大)做it.toList不可变的结构(List)。

P.S.如果您想真正使用Spark的DataSet/RDD - 使用aggregate(RDD)或agg(DataSet)。另请参见:foldLeft or foldRight equivalent in Spark?


说明关于迭代器:

scala> val it = List(1,2,3).toIterator 
it: Iterator[Int] = non-empty iterator 

scala> it.toList //traverse iterator and accumulate its data into List 
res0: List[Int] = List(1, 2, 3) 

scala> it.toList //iterator is drained, so second call doesn't traverse anything 
res1: List[Int] = List()