我想在我的combineByKey
/reduceByKey
/foldByKey
中有依赖于当前正在操作的键的逻辑。从方法特征中我可以看出,传递给这些方法的唯一参数是组合/缩小/折叠的值。为什么我不能在减少逻辑中引用键?
用一个简单的例子,我只是有一个RDD是(int, int)
元组,我想要的结果是tuple[0]
键入一个RDD其中值最接近键int
。
例如:
(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)
应减少到:
(1, 3)
(2, 2)
(3, 2)
注意,在比较(1, 3)
和(1, -1)
我不在乎哪一个是挑选,因为它们都是相同的距离。 “3”键相同。
我可以想象这样做的方法是沿着线的东西:
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)
但reduce
功能只需要两个参数:要合并两个值。看起来最简单的方法是参考我的减速器中的钥匙以实现我的目标;这可能吗?
如果我试试这个,我得到一个错误:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()
TypeError:() takes exactly 3 arguments (2 given)
我真的不寻找一个解决方案,这个例子中的问题。我想知道的是,如果有一个原因,钥匙没有传递给reduceByKey
函数?我认为这是我遗失的地图缩减哲学的一些基本原理。
注我可以通过插入一个映射步骤,其每个值映射到由该值和从钥匙的距离的元组解决我例如:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()
尼斯的答案。我的问题的真正答案很可能仅仅是“因为这不是API”。但无论如何我都在想这件事。 – FGreg