2017-06-23 50 views
3

在spark DF中使用dropDuplicates函数时将保留哪一行?这在火花文档中没有说明。dropDuplicates操作符中使用了哪些行?

  1. (按行排列)
  2. 保持最后一个保持第一(根据行顺序)
  3. 随机?

p.s.假设在分布式环境纱(未掌握本地)

回答

4

TL; DR保持优先(根据行顺序)

dropDuplicates操作者在火花SQL creates a logical plan with Deduplicate operator

Deduplicate操作is translated to First logical operator火花SQL的催化剂优化,这很好地回答你的问题(!)

你可以看到Deduplicate运营商在下面的逻辑计划。

// create datasets with duplicates 
val dups = spark.range(9).map(_ % 3) 

val q = dups.dropDuplicates 

以下是q数据集的逻辑计划。然后

scala> println(q.queryExecution.logical.numberedTreeString) 
00 Deduplicate [value#64L], false 
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L] 
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint 
03  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long 
04   +- Range (0, 9, step=1, splits=Some(8)) 

Deduplicate操作被转换为First逻辑运算符(即示出了本身作为优化后Aggregate操作者)。

scala> println(q.queryExecution.optimizedPlan.numberedTreeString) 
00 Aggregate [value#64L], [value#64L] 
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L] 
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint 
03  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long 
04   +- Range (0, 9, step=1, splits=Some(8)) 

花一些时间审查阿帕奇星火的代码后,我似乎已说服自己,dropDuplicates运营商正是groupBy其次first功能(!)

第一(columnName:String,ignoreNulls:Boolean):列集合函数:返回组中列的第一个值。

import org.apache.spark.sql.functions.first 
val firsts = dups.groupBy("value").agg(first("value") as "value") 
scala> println(firsts.queryExecution.logical.numberedTreeString) 
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139] 
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L] 
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint 
03  +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long 
04   +- Range (0, 9, step=1, splits=Some(8)) 

scala> firsts.explain 
== Physical Plan == 
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)]) 
+- Exchange hashpartitioning(value#64L, 200) 
    +- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)]) 
     +- *SerializeFromObject [input[0, bigint, false] AS value#64L] 
     +- *MapElements <function1>, obj#63: bigint 
      +- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long 
       +- *Range (0, 9, step=1, splits=8) 

可疑dropDuplicates操作者可以是更好的性能(如groupBy通常是中可能的解决方案的最慢)。

+0

似乎是一个潜在的性能改进是有一个无序/随机dropDuplicates选项,即没有执行第一个 – Qmage

+0

@Qmage我不知道如果第一个需要排序。我很怀疑。感谢您的发现。感谢您接受它作为答案!赞赏。 –

+0

@JacekLaskowski你想知道如何选择一个随机值而不是第一个值吗? – belka

相关问题