2017-03-28 236 views
1

假设,我有一个数据帧象下面这样:如何根据spark数据框中的某些列过滤掉重复的行?

enter image description here

在这里,你可以看到,事务处理编号1,2和3具有相同的值用于列A,B,C,但对于列d不同的值和E. E列有日期条目。

  1. 对于相同的A,B和C组合(A = 1,B = 1,C = 1),我们有3行。基于E列的最近交易日期,我想只取一行表示具有最近日期的行。但对于最近的日期,有2个交易。但是,如果在A列中找到A,B,C和最近日期的相同组合的两个或更多行,我想只取其中一个。 因此,我对该组合的预期产出将为行编号3或4 (任何人都会这样做)。
  2. 对于相同的A,B和C组合(A = 2,B = 2,C = 2),我们有2行。但是根据列E,最近的日期是行号5的日期。因此,我们将这一行作为A,B和C的这种组合。 因此,我对该组合的预期输出将是行号

所以最终的输出将是(3和5)或(4和5)

现在我应该怎么处理方法:

  1. 我这样说的:

两个reduceByKey和groupByKey可用于同一目的,但 reduceByKey工作在一个大的要好得多数据集。这是因为在 混洗数据之前,Spark 知道它可以将输出与每个分区上的公用密钥组合在一起。

  • 我试图与GROUPBY上的列A,B,C和最大上柱E.但它不能给我的行的头,如果存在的多行同一日期。
  • 什么是最优化的方法来解决这个问题?提前致谢。

    编辑:我需要找回我过滤的交易。如何做到这一点呢?

    +0

    什么版本的火花您使用的是? – eliasah

    +0

    方法2更好。函数“max”总是会返回一个最大日期。如果存在多个这样的日期,则只会选择一个。 – pasha701

    +0

    我正在使用spark-2.1.0版本 –

    回答

    0

    我已经使用spark window functions让我的解决方案:

    val window = Window 
         .partitionBy(dataframe("A"), dataframe("B"),dataframe("C")) 
         .orderBy(dataframe("E") desc) 
    
    val dfWithRowNumber = dataframe.withColumn("row_number", row_number() over window) 
    val filteredDf = dfWithRowNumber.filter(dfWithRowNumber("row_number") === 1) 
    
    +0

    这是保证做全面洗牌。您可能希望将它与实现相同的pairRDD/reduceByKey选项进行比较。 – sourabh

    0

    链接可能通过几个步骤。 Agregated数据框中:

    val agregatedDF=initialDF.select("A","B","C","E").groupBy("A","B","C").agg(max("E").as("E_max")) 
    

    链接INTIAL-agregated:

    initialDF.join(agregatedDF, List("A","B","C")) 
    

    如果初始数据帧来自蜂巢,都可以简化。

    0
    val initialDF = Seq((1,1,1,1,"2/28/2017 0:00"),(1,1,1,2,"3/1/2017 0:00"), 
    (1,1,1,3,"3/1/2017 0:00"),(2,2,2,1,"2/28/2017 0:00"),(2,2,2,2,"2/25/20170:00")) 
    

    这将错过相应的山坳(d)

    initialDF 
    .toDS.groupBy("_1","_2","_3") 
    .agg(max(col("_5"))).show 
    

    如果你想为最大山坳对应的冷:

    initialDF.toDS.map(x=>x._1,x._2,x._3,x._5,x._4))).groupBy("_1","_2","_3") 
    .agg(max(col("_4")).as("_4")).select(col("_1"),col("_2"),col("_3"),col("_4._2"),col("_4._1")).show 
    

    对于ReduceByKey你可以转换数据集以配对RDD,然后解决它。如果Catalyst无法优化第一个组中的groupByKey,应该更快。请参阅Rolling your own reduceByKey in Spark Dataset

    相关问题