2017-09-14 78 views
0
**DF1** **DF2**   **output_DF** 
120 D  A    120 null A 
120 E  B    120 null B 
125 F  C    120 null C 
      D    120 D D 
      E    120 E E 
      F    120 null F 
      G    120 null G 
      H    120 null H 
          125 null A 
          125 null B 
          125 null C 
          125 null D 
          125 null E 
          125 F F 
          125 null G 
          125 null H 

从数据帧1和数据帧2中需要获取spark-shell中的最终输出数据帧。 其中A,B,C,D,E,F采用日期格式(yyyy-MM-dd)& 120,125是有成千上万个ticket_id的ticket_id列。 我刚刚在这里提取了一个。如何使用scala获得此信息

+0

您也可以查看'df.join()'函数和可能''df.na.fill()'。 – Shaido

+0

请您详细说明一下... !! – maduri

+0

[加入密钥上的Spark数据帧]的可能重复(https://stackoverflow.com/questions/40343625/joining-spark-dataframes-on-the-key) – Harald

回答

0

为了让你可以使用df.join()df.na.fill()(如在评论中提到)预期的结果,就像这样:

火花2.0+

val resultDF = df1.select("col1").distinct.collect.map(_.getInt(0)).map(id => df1.filter(s"col1 = $id").join(df2, df1("col2") === df2("value"), "right").na.fill(id)).reduce(_ union _) 

火花1.6

val resultDF = df1.select("col1").distinct.collect.map(_.getInt(0)).map(id => df1.filter(s"col1 = $id").join(df2, df1("col2") === df2("value"), "right").na.fill(id)).reduce(_ unionAll _) 

它会给你以下结果 -

+---+----+-----+ 
|120|null| A| 
|120|null| B| 
|120|null| C| 
|120| D| D| 
|120| E| E| 
|120|null| F| 
|120|null| G| 
|120|null| H| 
|125|null| A| 
|125|null| B| 
|125|null| C| 
|125|null| D| 
|125|null| E| 
|125| F| F| 
|125|null| G| 
|125|null| H| 
+---+----+-----+ 

我希望它有帮助!

+0

如果列中有N个ID,值只在几个日期,当加入独特的日期DF2我们需要得到上述output_DF。为了更清晰地编辑问题。 @himanshulllTian – maduri

+0

@maduri - 我已根据编辑的问题更新了我的答案。 – himanshuIIITian

+0

:65:error:value union不是org.apache.spark.sql.DataFrame的成员 – maduri

0

全部加入可能的值,然后留下原始数据帧加入:

import hiveContext.implicits._ 
val df1Data = List((120, "D"), (120, "E"), (125, "F")) 
val df2Data = List("A", "B", "C", "D", "E", "F", "G", "H") 
val df1 = sparkContext.parallelize(df1Data).toDF("id", "date") 
val df2 = sparkContext.parallelize(df2Data).toDF("date") 

// get unique ID: 120, 125 
val uniqueIDDF = df1.select(col("id")).distinct() 
val fullJoin = uniqueIDDF.join(df2) 
val result = fullJoin.as("full").join(df1.as("df1"), col("full.id") === col("df1.id") && col("full.date") === col("df1.date"), "left_outer") 

val sorted = result.select(col("full.id"), col("df1.date"), col("full.date")).sort(col("full.id"), col("full.date")) 
sorted.show(false) 

输出:

+---+----+----+ 
|id |date|date| 
+---+----+----+ 
|120|null|A | 
|120|null|B | 
|120|null|C | 
|120|D |D | 
|120|E |E | 
|120|null|F | 
|120|null|G | 
|120|null|H | 
|125|null|A | 
|125|null|B | 
|125|null|C | 
|125|null|D | 
|125|null|E | 
|125|F |F | 
|125|null|G | 
|125|null|H | 
+---+----+----+ 

排序这里只是为了显示同样的结果,可以跳过。