2016-09-15 126 views
-1

我想比较两个文件,如果没有匹配额外的记录加载到另一个文件与不匹配的记录。 同时比较文件和记录计数中的每个字段。如何使用spark比较两个文件?

+1

什么是文件的结构? –

+0

这是CSV格式 – Nathon

+0

它们是什么模式?什么是比较栏?有没有任何限制? –

回答

3

比方说,你有两个文件:

scala> val a = spark.read.option("header", "true").csv("a.csv").alias("a"); a.show 
+---+-----+ 
|key|value| 
+---+-----+ 
| a| b| 
| b| c| 
+---+-----+ 

a: org.apache.spark.sql.DataFrame = [key: string, value: string] 

scala> val b = spark.read.option("header", "true").csv("b.csv").alias("b"); b.show 
+---+-----+ 
|key|value| 
+---+-----+ 
| b| c| 
| c| d| 
+---+-----+ 

b: org.apache.spark.sql.DataFrame = [key: string, value: string] 

目前还不清楚哪种类型的您正在寻找匹配的记录,但它很容易通过任何定义与join找到他们:

scala> a.join(b, Seq("key")).show 
+---+-----+-----+ 
|key|value|value| 
+---+-----+-----+ 
| b| c| c| 
+---+-----+-----+ 

scala> a.join(b, Seq("key"), "left_outer").show 
+---+-----+-----+ 
|key|value|value| 
+---+-----+-----+ 
| a| b| null| 
| b| c| c| 
+---+-----+-----+ 

scala> a.join(b, Seq("key"), "right_outer").show 
+---+-----+-----+ 
|key|value|value| 
+---+-----+-----+ 
| b| c| c| 
| c| null| d| 
+---+-----+-----+ 

scala> a.join(b, Seq("key"), "outer").show 
+---+-----+-----+ 
|key|value|value| 
+---+-----+-----+ 
| c| null| d| 
| b| c| c| 
| a| b| null| 
+---+-----+-----+ 

如果您正在寻找b.csva.csv中不存在的记录:

scala> val diff = a.join(b, Seq("key"), "right_outer").filter($"a.value" isNull).drop($"a.value") 
scala> diff.show 
+---+-----+ 
|key|value| 
+---+-----+ 
| c| d| 
+---+-----+ 

scala> diff.write.csv("diff.csv") 
+0

感谢Daniel的回复,对我非常有帮助。 – Nathon