2017-06-06 61 views
0

比方说,我们有一个dataset/dataframe在星火其中有3列 IDWordTimestampUDAF合并哪里是在Spark DataSet中第一orderdby /行数据帧

我想在那里我可以写一个UDAF功能做这样的事情

df.show() 

ID | Word | Timestamp 
1 | I | "2017-1-1 00:01" 
1 | am | "2017-1-1 00:02" 
1 | Chris | "2017-1-1 00:03" 
2 | I | "2017-1-1 00:01" 
2 | am | "2017-1-1 00:02" 
2 | Jessica | "2017-1-1 00:03" 

val df_merged = df.groupBy("ID") 
    .sort("ID", "Timestamp") 
    .agg(custom_agg("ID", "Word", "Timestamp") 

df_merged.show 

ID | Words   | StartTime  |  EndTime  | 
1 | "I am Chris" | "2017-1-1 00:01" | "2017-1-1 00:03" | 
1 | "I am Jessica" | "2017-1-1 00:01" | "2017-1-1 00:03" | 

问题是如何确保该列Words会以正确的顺序进行合并我UDAF里面?

+0

列返回从'udaf'总是在数据框的列末尾。但是你可以用'select'来反正你想要的。 –

回答

0

这里是星火2的groupByKey(与非类型化的Dataset使用)的sollution groupByKey的.The好处是,你可以访问组(你在mapGroupsIterator[Row]):

df.groupByKey(r => r.getAs[Int]("ID")) 
     .mapGroups{case(id,rows) => { 
     val sorted = rows 
      .toVector 
      .map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp"))) 
      .sortBy(_._2.getTime) 

     (id, 
     sorted.map(_._1).mkString(" "), 
     sorted.map(_._2).head, 
     sorted.map(_._2).last 
     ) 
     } 
     }.toDF("ID","Words","StartTime","EndTime") 
+0

这个答案也是正确的。 但是,groupByKey解决方案比Window解决方案快很多,至少对于小型案例数据场景来说。 有什么特别的原因吗? – azouritis

0

对不起,我不使用斯卡拉,并希望你能读它。

Window功能,可以做你想做的:

df = df.withColumn('Words', f.collect_list(df['Word']).over(
    Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding, 
                    end=Window.unboundedFollowing))) 

输出:

+---+-------+-----------------+----------------+         
| ID| Word|  Timestamp|   Words| 
+---+-------+-----------------+----------------+ 
| 1|  I|2017-1-1 00:01:00| [I, am, Chris]| 
| 1|  am|2017-1-1 00:02:00| [I, am, Chris]| 
| 1| Chris|2017-1-1 00:03:00| [I, am, Chris]| 
| 2|  I|2017-1-1 00:01:00|[I, am, Jessica]| 
| 2|  am|2017-1-1 00:02:00|[I, am, Jessica]| 
| 2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]| 
+---+-------+-----------------+----------------+ 

然后groupBy上面的数据:

df = df.groupBy(df['ID'], df['Words']).agg(
    f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime')) 
df = df.withColumn('Words', f.concat_ws(' ', df['Words'])) 

输出:

+---+------------+-----------------+-----------------+       
| ID|  Words|  StartTime|   EndTime| 
+---+------------+-----------------+-----------------+ 
| 1| I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00| 
| 2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00| 
+---+------------+-----------------+-----------------+ 
+0

这是'python'不是'scala'代码,其次也许你可以向OP解释代码? – mtoto

+0

Thx,男人。代码非常相似,在Scala中,不用担心。 – azouritis