2017-10-06 43 views
0

我有数据框。我需要根据每个Id的updateTableTimestamp表中最新的记录。 df.show()如何使用上次时间戳从数据框中选择不同的记录

+--------------------+-----+-----+--------------------+ 
|   Description| Name| id |updateTableTimestamp| 
+--------------------+-----+-----+--------------------+ 
|     | 042F|64185|  1507306990753| 
|     | 042F|64185|  1507306990759| 
|Testing    |042MF| 941|  1507306990753| 
|     | 058F| 8770|  1507306990753| 
|Testing 3   |083MF|31663|  1507306990759| 
|Testing 2   |083MF|31663|  1507306990753| 
+--------------------+-----+-----+--------------------+ 

需要输出

+--------------------+-----+-----+--------------------+ 
|   Description| Name| id |updateTableTimestamp| 
+--------------------+-----+-----+--------------------+ 
|     | 042F|64185|  1507306990759| 
|Testing    |042MF| 941|  1507306990753| 
|     | 058F| 8770|  1507306990753| 
|Testing 3   |083MF|31663|  1507306990759| 
+--------------------+-----+-----+--------------------+ 

我已经试过

sqlContext.sql("SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY Id ORDER BY updateTableTimestamp DESC) rank from temptable) tmp where rank = 1") 

它给出了分区错误。在线程异常 “主” java.lang.RuntimeException: [1.29] failure: ``union'' expected but(” found`I现在用火花1.6.2

+1

“它给错误” - 错误是什么? – FuzzyTree

+1

尝试'where tmp.rank = 1'或尝试使用与'rank'不同的别名,因为它是保留字。 – Simon

+0

不支持PARTITION – lucy

回答

0
import org.apache.spark.sql.functions.first 
import org.apache.spark.sql.functions.desc 
import org.apache.spark.sql.functions.col 

val dfOrder = df.orderBy(col("id"), col("updateTableTimestamp").desc) 

val dfMax = dfOrder.groupBy(col("id")). 
      agg(first("description").as("description"), 
       first("name").as("name"), 
       first("updateTableTimestamp").as("updateTableTimestamp")) 
     dfMax.show 

enter image description here

在此之后,如果你想重新整理你的领域,只是应用塞莱对您的新DF进行编号功能。

+0

太棒了!埃里克巴拉哈斯谢谢 – lucy

0

选择 说明,姓名,身份证,updateTableTimestamp 从table_name的 其中id在 (从TABLE_NAME组由updateTableTimestamp选择ID),以便通过updateTableTimestamp递减;

相关问题