有时Spark会以低效的方式“优化”数据帧。考虑星火2.1下面的例子中(也可以在星火1.6转载):如何防止Spark优化
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
在这个例子中,我希望有一个数据帧的昂贵的改造后写1个文件(这只是证明了问题的示例)。 Spark将coalesce(1)
向上移动,使得UDF仅应用于包含1分区的数据帧,从而破坏并行性(有趣的是,repartition(1)
不以此方式运行)。
概括地说,当我想在我的转换的某个部分中增加并行性,但之后会减少并行性时,会发生此行为。
我找到了一个解决方法,其包括高速缓存数据框,然后触发数据帧的完整的评价:
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache
df_result.rdd.count // trigger computation
df_result
.coalesce(1)
.saveAsTable(tablename)
我的问题是:有另一种方式告诉星火不改变的位置一定的转变?
简而言之,您想*实例化一个RDD与500个分区,然后*实例化*另一个将结果合并到1个分区中,以便您可以将其保存到单个文件中 - cf. https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code >>疯狂猜测:也许一个简单的调用'getNumPartitions()'将足以强制实例化,没有必须用'count()'来实际扫描结果... –
@SamsonScharfrichter不调用'getNumPartitions()'是不够的,并且不会阻止合并被“推高” –
巧合:我偶然发现了那个演示,来自最近的Spark Summit> https://www.slideshare.net/databricks/why-you-should-care-about-data-layout-in-the-file-system-with-cheng-lian-and-vida-公顷/ 40 –