在spark DF中使用dropDuplicates函数时将保留哪一行?这在火花文档中没有说明。dropDuplicates操作符中使用了哪些行?
- (按行排列)
- 保持最后一个保持第一(根据行顺序)
- 随机?
p.s.假设在分布式环境纱(未掌握本地)
在spark DF中使用dropDuplicates函数时将保留哪一行?这在火花文档中没有说明。dropDuplicates操作符中使用了哪些行?
p.s.假设在分布式环境纱(未掌握本地)
TL; DR保持优先(根据行顺序)
dropDuplicates
操作者在火花SQL creates a logical plan with Deduplicate
operator。
这Deduplicate
操作is translated to First
logical operator火花SQL的催化剂优化,这很好地回答你的问题(!)
你可以看到Deduplicate
运营商在下面的逻辑计划。
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
以下是q
数据集的逻辑计划。然后
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Deduplicate
操作被转换为First
逻辑运算符(即示出了本身作为优化后Aggregate
操作者)。
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
花一些时间审查阿帕奇星火的代码后,我似乎已说服自己,dropDuplicates
运营商正是groupBy
其次first功能(!)
第一(columnName:String,ignoreNulls:Boolean):列集合函数:返回组中列的第一个值。
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements <function1>, obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
我可疑该dropDuplicates
操作者可以是更好的性能(如groupBy
通常是中可能的解决方案的最慢)。
似乎是一个潜在的性能改进是有一个无序/随机dropDuplicates选项,即没有执行第一个 – Qmage
@Qmage我不知道如果第一个需要排序。我很怀疑。感谢您的发现。感谢您接受它作为答案!赞赏。 –
@JacekLaskowski你想知道如何选择一个随机值而不是第一个值吗? – belka