2016-07-24 59 views
1
from pyspark import SparkContext 

sc = SparkContext() 

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)], numSlices=8) 
rdd2 = rdd1.mapValues(lambda x: x) 

这些RDDS具有相同的分区:PySpark加入洗牌共划分RDDS

rdd1.keys().glom().collect() 
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']] 

rdd2.keys().glom().collect() 
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']] 

这里有多个答案上,这样表明,加盟合作分区数据不会导致洗牌,这使得对我来说很有意义。例如:Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?

然而,当我加入使用PySpark这些共划分RDDS,该数据被混洗到一个新的分区:

rdd1.join(rdd2).keys().glom().collect() 
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], [], [], [], [], [], [], [], [], []] 

甚至当我设置的新的分区数的分区改变原来8:

rdd1.join(rdd2, numPartitions=8).keys().glom().collect() 
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], []] 

为什么我不能避免使用这些共分区的RDD洗牌?

我正在使用Spark 1.6.0。

回答

3

在这种情况下既没有rdd1也不rdd2被定义划分

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) 
rdd2 = rdd1.mapValues(lambda x: x) 

rdd1.partitioner is None 
## True 

rdd2.partitioner is None 
# True 

所以有不共分配。虽然你可以分区数据和加入:

n = rdd1.getNumPartitions() 
rdd1part = rdd1.partitionBy(n) 
rdd2part = rdd2.partitionBy(n) 

rdd1part.join(rdd2part) # rdd1part and rdd2part are co-partitioned 

这将简单地重新排列DAG,并不会阻止洗牌。

另请参见Default Partitioning Scheme in Spark

+0

谢谢,这可能只是解决我面临的问题。不过,我很惊讶,设置'numSlices'并不意味着明确的分区。 (我想现在我明白为什么它不叫'numPartitions',就像其他函数一样。) –

+2

当我们谈论Spark中的分区时,我们是指两个不同的概念。你可以查看我的答案http://stackoverflow.com/q/34491219/1560062一些解释。 – zero323