我正在使用scala的/:操作符计算一系列数据集聚合。为聚合的代码如下所示:当我尝试运行了哪些形式传入f参数单独的功能列表块发生Scala中汇总Spark数据集的问题
def execute1(
xy: DATASET,
f: Double => Double): Double = {
println("PRINTING: The data points being evaluated: " + xy)
println("PRINTING: Running execute1")
var z = xy.filter{ case(x, y) => abs(y) > EPS}
var ret = - z./:(0.0) { case(s, (x, y)) => {
var px = f(x)
s + px*log(px/y)}
}
ret
}
我的问题。的功能的列表是:
lazy val pdfs = Map[Int, Double => Double](
1 -> betaScaled,
2 -> gammaScaled,
3 -> logNormal,
4 -> uniform,
5 -> chiSquaredScaled
)
穿过列表运行聚合的执行程序功能是:
def execute2(
xy: DATASET,
fs: Iterable[Double=>Double]): Iterable[Double] = {
fs.map(execute1(xy, _))
}
随着最终执行块:
val kl_rdd = master_ds.mapPartitions((it:DATASET) => {
val pdfsList = pdfs_broadcast.value.map(
n => pdfs.get(n).get
)
execute2(it, pdfsList).iterator
的问题是,当聚合确实发生,它们似乎都聚集在输出数组的第一个槽中,当我希望分别显示每个功能的聚合时。我跑了测试,以确认所有五个功能实际上正在运行,并且他们正在第一个槽中被总结。
The pre-divergence value: -4.999635700491883
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
这是我曾经遇到最困难的问题之一,因此任何方向将不胜感激。将在其应有的地方给予信贷。谢谢!