2017-06-12 58 views
2

有时Spark会以低效的方式“优化”数据帧。考虑星火2.1下面的例子中(也可以在星火1.6转载):如何防止Spark优化

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value") 

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d}) 

val df_result = df 
.withColumn("udfResult",expensiveUDF($"value")) 

df_result 
.coalesce(1) 
.saveAsTable(tablename) 

在这个例子中,我希望有一个数据帧的昂贵的改造后写1个文件(这只是证明了问题的示例)。 Spark将coalesce(1)向上移动,使得UDF仅应用于包含1分区的数据帧,从而破坏并行性(有趣的是,repartition(1)不以此方式运行)。

概括地说,当我想在我的转换的某个部分中增加并行性,但之后会减少并行性时,会发生此行为。

我找到了一个解决方法,其包括高速缓存数据框,然后触发数据帧的完整的评价:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value") 

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d}) 

val df_result = df 
.withColumn("udfResult",expensiveUDF($"value")) 
.cache 

df_result.rdd.count // trigger computation 

df_result 
.coalesce(1) 
.saveAsTable(tablename) 

我的问题是:有另一种方式告诉星火不改变的位置一定的转变?

+0

简而言之,您想*实例化一个RDD与500个分区,然后*实例化*另一个将结果合并到1个分区中,以便您可以将其保存到单个文件中 - cf. https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code >>疯狂猜测:也许一个简单的调用'getNumPartitions()'将足以强制实例化,没有必须用'count()'来实际扫描结果... –

+0

@SamsonScharfrichter不调用'getNumPartitions()'是不够的,并且不会阻止合并被“推高” –

+0

巧合:我偶然发现了那个演示,来自最近的Spark Summit> https://www.slideshare.net/databricks/why-you-should-care-about-data-layout-in-the-file-system-with-cheng-lian-and-vida-公顷/ 40 –

回答

3

其实这是不是因为SparkSQL的优化,SparkSQL不会改变合并操作的位置,作为执行计划显示:

Coalesce 1 
+- *Project [value#2, UDF(value#2) AS udfResult#11] 
    +- *SerializeFromObject [input[0, double, false] AS value#2] 
     +- Scan ExternalRDDScan[obj#1] 

我引述聚结API的描述一段:

注意:本段由jira SPARK-19399添加。所以它不应该在2.0的API中找到。

但是,如果您正在进行剧烈的聚结,到numPartitions = 1,这可能导致您的计算发生在比您喜欢的更少的节点 (例如,在numPartitions = 1的情况下为一个节点)。要避免这种情况,你可以调用重新分区。这将添加一个混洗步骤, ,但意味着当前的上游分区将并行执行 (无论当前分区是什么)。

coalesce API不执行混洗,但会导致先前RDD和当前RDD之间的狭窄依赖关系。由于RDD是懒惰评估,计算实际上是通过合并分区完成的。

要防止它,您应该使用重新分区API。