2017-07-31 6398 views
3

我有一个DataFrame,DataFrame hava两列'value'和'timestamp','timestmp'是有序的,我想得到DataFrame的最后一行,我该怎么办?如何从DataFrame获取最后一行?

这是我输入:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 1|  1| 
| 4|  2| 
| 3|  3| 
| 2|  4| 
| 5|  5| 
| 7|  6| 
| 3|  7| 
| 5|  8| 
| 4|  9| 
| 18|  10| 
+-----+---------+ 

这是我的代码:

val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10)) 
    var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp") 

这是我预期的结果:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 
+0

请问'df.where($ “时间戳” === MAX($ “时间戳”)'工作? –

+0

它亘古不变的工作交流rangepartitioning(TS# 7 ASC NULLS FIRST,200) – mentongwu

回答

3

我想简单地reduce

df.reduce { (x, y) => 
    if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y 
} 
1

如果timestamp列是独一无二的,是递增顺序然后有以下方法得到最后一行

println(df.sort($"timestamp", $"timestamp".desc).first()) 

// Output [1,1] 

df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println) 

// Output [1,1] 

df.where($"timestamp" === df.count()).show 

输出:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 

如果没有创建索引的新列并选择最后一个指标如下

val df1 = spark.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map { 
    case (row, index) => Row.fromSeq(row.toSeq :+ index) 
}, 
StructType(df.schema.fields :+ StructField("index", LongType, false))) 

df1.where($"timestamp" === df.count()).drop("index").show 

输出:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 
+0

排序功能效率低下,我不想使用排序功能 – mentongwu

+0

比你可以使用df.where($“timestamp”=== df.count()) –

1

最有效的方法是到你的DataFrame中reduce。这给你一个你可以转换回DataFrame的单行,但由于它只包含1条记录,所以这没什么意义。

sparkContext.parallelize(
    Seq(
    df.reduce { 
    (a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b 
    } match {case Row(value:Int,timestamp:Int) => (value,timestamp)} 
) 
) 
.toDF("value","timestamp") 
.show 


+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 

效率较低(因为它需要改组)虽然短是这样的解决方案:

df 
.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head) 
0

是 我会简单地使用查询 - 订单表格由降序排列 - 来自这需要1个值为了

df.createOrReplaceTempView("table_df") 
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1""" 
latest_rec = self.sqlContext.sql(query_latest_rec) 
latest_rec.show() 
相关问题