2017-08-01 71 views
2

我有很多火花dataframes上,我需要做到以下几点:如何在火花数据框中强制重新分区?

1) load a single spark dataframe 
2) select rows from it 
3) merge it with all of the previous spark dataframes 

现在,每个上面的操作需要不同的numberof分区。选择行需要很多分区,比如1​​00个分区。合并需要非常少的分区,如10个分区。

所以,我真的希望它是这样工作的:

1) load a single spark dataframe 
1.5) repartition into 100 partitions 
2) select rows from it 
2.5) repartition into 10 partitions 
3) merge it with all of the previous spark dataframes 

现在,我怎么用力这2和3之间步骤1和2之间,并在重新分区的?

我知道,当我打电话给data = data.repartition(7)时,它被懒惰地评估,所以它只在实际保存时才重新分配。

所以,我一直在做这样的:

1) load a single spark dataframe 
1.5) repartition into 100 partitions 
1.75) `df.count()` *just* to force materialization 
2) select rows from it 
2.5) repartition into 10 partitions 
2.75) `df.count()` *just* to force materialization 
3) merge it with all of the previous spark dataframes 

有没有更好的办法来迫使它在这里重新分区之间的?有没有比在数据框上运行count()更好的方法?

回答

4

由于Spark中数据框的所有转换都是懒惰评估的,因此您需要执行一个操作来实际执行转换。目前没有其他方式来强制转换。

所有可用的数据帧操作都可以在documentation(查看动作)中找到。在你的情况下,而不是使用count()来强制转换,你可以使用first()这应该会更便宜。

在步骤2.5中,您可以用coalesce()代替repartition(),因为它可以避免完全洗牌。当新的分区数量少于以前时,这通常是有利的,因为它将使数据移动最小化。

编辑:

要回答你有关,如果你不使用任何行动,只是做会发生什么问题:1)重新分区,2)火花数据框中变换,3)重新划分。由于优化火花对转换的执行,似乎并不总是遵循这个顺序。我做了一个小测试程序来测试它:

val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y") 
val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b") 
df1.explain(true) 

这将返回有关如何计算数据帧的信息。

== Parsed Logical Plan == 
'Filter NOT ('y = b) 
+- Repartition 5, true 
    +- Filter NOT (x#5 = 1.0) 
     +- Repartition 10, true 
     +- Project [_1#2 AS x#5, _2#3 AS y#6] 
      +- LogicalRDD [_1#2, _2#3] 

== Analyzed Logical Plan == 
x: double, y: string 
Filter NOT (y#6 = b) 
+- Repartition 5, true 
    +- Filter NOT (x#5 = 1.0) 
     +- Repartition 10, true 
     +- Project [_1#2 AS x#5, _2#3 AS y#6] 
      +- LogicalRDD [_1#2, _2#3] 

== Optimized Logical Plan == 
Repartition 5, true 
+- Project [_1#2 AS x#5, _2#3 AS y#6] 
    +- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b)) 
     +- LogicalRDD [_1#2, _2#3] 

== Physical Plan == 
Exchange RoundRobinPartitioning(5) 
+- *Project [_1#2 AS x#5, _2#3 AS y#6] 
    +- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b)) 
     +- Scan ExistingRDD[_1#2,_2#3] 

正如这里可以看到,在repartition(10)步骤是不包括在内,似乎在优化过程中已被删除。

+0

但有没有什么办法可以避免像'first()'这样的无用操作,当我真的不在乎它从中输出什么?我只是想重新分配,但我不关心它实际输出的内容。有什么办法可以避免这种情况? – Sother

+0

不幸的是,您不得不对数据框执行操作来应用所有转换,所以目前无法避免它。在答案中增加了一些信息。 – Shaido

+0

@Sother更新了答案。 – Shaido