2016-10-11 70 views
0

我想在我的combineByKey/reduceByKey/foldByKey中有依赖于当前正在操作的键的逻辑。从方法特征中我可以看出,传递给这些方法的唯一参数是组合/缩小/折叠的值。为什么我不能在减少逻辑中引用键?

用一个简单的例子,我只是有一个RDD是(int, int)元组,我想要的结果是tuple[0]键入一个RDD其中值最接近键int

例如:

(1, 8) 
(1, 3) 
(1, -1) 
(2, 4) 
(2, 5) 
(2, 2) 
(3, 2) 
(3, 4) 

应减少到:

(1, 3) 
(2, 2) 
(3, 2) 

注意,在比较(1, 3)(1, -1)我不在乎哪一个是挑​​选,因为它们都是相同的距离。 “3”键相同。

我可以想象这样做的方法是沿着线的东西:

rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2) 

reduce功能只需要两个参数:要合并两个值。看起来最简单的方法是参考我的减速器中的钥匙以实现我的目标;这可能吗?

如果我试试这个,我得到一个错误:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect() 

TypeError:() takes exactly 3 arguments (2 given)

我真的不寻找一个解决方案,这个例子中的问题。我想知道的是,如果有一个原因,钥匙没有传递给reduceByKey函数?我认为这是我遗失的地图缩减哲学的一些基本原理。


注我可以通过插入一个映射步骤,其每个值映射到由该值和从钥匙的距离的元组解决我例如:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])]))) 
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap() 

回答

0

我认为没有很强的理由不要传递钥匙。
但是,我觉得reduceByKey API是为通用用例设计的 - 计算每个键值的总和。到目前为止,我从来都不需要在计算值时使用键。但那只是我的个人意见。

另外你解决的问题似乎是简单的聚合问题。 min()groupByKey可以找到答案。我知道你不是在寻找解决方案,而是在于如何写作。

from pyspark import SparkContext 

sc = SparkContext() 
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k)))) 
print(reduced.collectAsMap()) 

结果

{1: 3, 2: 2, 3: 2} 
+0

尼斯的答案。我的问题的真正答案很可能仅仅是“因为这不是API”。但无论如何我都在想这件事。 – FGreg

相关问题