2017-03-07 108 views
0

考虑代码:怪异的行为

val df1 = spark.table("t1").filter(col("c1")=== lit(127)) 
val df2 = spark.sql("select x,y,z from ORCtable") 
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*), 
    trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter") 
df3.select($"y_R",$"z_R").show(500,false) 

这是生产的代码失败java.lang.OutOfMemoryError: GC overhead limit exceeded警告WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.

但是,如果我运行下面的代码:

val df1 = spark.table("t1").filter(col("c1")=== lit(127)) 
val df2 = spark.sql("select x,y,z from ORCtable limit 2000000")//only difference here 
//ORC table has 1651343 rows so doesn't exceed limit 2000000 
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*), 
    trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter") 
df3.select($"y_R",$"z_R").show(500,false) 

这将产生正确的输出。我不知道为什么会发生这种情况以及发生了什么变化。有人可以帮助理解这一点吗?

回答

2

回答我自己的问题:火花physical execution plan是不同的两种方式产生相同的dataframe可以通过调用.explain()方法来检查。

第一种方法使用broadcast-hash join,这导致java.lang.OutOfMemoryError: GC overhead limit exceeded,而后一种方式运行sort-merge join,通常速度较慢,但​​不会对垃圾收集造成太大影响。

物理执行计划的这种差异由df2 dataframe上的附加filter操作引入。