2016-09-21 54 views
2

我有一个(键,值)元素的RDD。键是NumPy数组。 NumPy数组不可散列,并且在尝试执行reduceByKey操作时会导致问题。Spark:当键是不可排列的numpy数组时,如何“reduceByKey”?

有没有办法给我的手动散列函数提供Spark上下文?或者有没有其他解决这个问题的方法(除了实际上将数组散列为“离线”并将Spark传递给散列键)?

下面是一个例子:

import numpy as np 
from pyspark import SparkContext 

sc = SparkContext() 

data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]]) 
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y) 
rd.collect() 

错误是:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

...

TypeError: unhashable type: 'numpy.ndarray'

回答

2

最简单的解决方案是将其转换为一个对象,它是可哈希。例如:

from operator import add 

reduced = sc.parallelize(data).map(
    lambda x: (tuple(x), x.sum()) 
).reduceByKey(add) 

并在需要时再转换回来。

Is there a way to supply the Spark context with my manual hash function

不是一个简单的。整个机制取决于事实对象实现了一个__hash__方法和C扩展名不能被猴子修补。您可以尝试使用调度来覆盖pyspark.rdd.portable_hash,但即使考虑转换成本,我也怀疑这是否值得。