2017-07-04 394 views
4

我要过滤的RDD源的列:如何才能不使用IN子句中的过滤条件的火花

val source = sql("SELECT * from sample.source").rdd.map(_.mkString(",")) 
val destination = sql("select * from sample.destination").rdd.map(_.mkString(",")) 

val source_primary_key = source.map(rec => (rec.split(",")(0))) 
val destination_primary_key = destination.map(rec => (rec.split(",")(0))) 

val src = source_primary_key.subtractByKey(destination_primary_key) 

我想用在过滤条件子句中过滤出存在于src中的值从源代码,类似下面(编者):

val source = spark.read.csv(inputPath + "/source").rdd.map(_.mkString(",")) 
val destination = spark.read.csv(inputPath + "/destination").rdd.map(_.mkString(",")) 

val source_primary_key = source.map(rec => (rec.split(",")(0))) 
val destination_primary_key = destination.map(rec => (rec.split(",")(0))) 

val extra_in_source = source_primary_key.filter(rec._1 != destination_primary_key._1) 

相当于SQL代码

SELECT * FROM SOURCE WHERE ID IN (select ID from src) 

谢谢

+0

你的值是什么类型? – eliasah

+0

数据类型可能会有所不同,有时INT和有时字符串 – Vignesh

+0

这不是我所要求的。 'src'或'source'的类型是什么?你在使用RDD或DataFrame吗? – eliasah

回答

6

因为你的代码是不可复制的,这里是如何使用select * from t where id in (...)spark-sql一个小例子:

// create a DataFrame for a range 'id' from 1 to 9. 
scala> val df = spark.range(1,10).toDF 
df: org.apache.spark.sql.DataFrame = [id: bigint] 

// values to exclude 
scala> val f = Seq(5,6,7) 
f: Seq[Int] = List(5, 6, 7) 

// select * from df where id is not in the values to exclude 
scala> df.filter(!col("id").isin(f : _*)).show 
+---+                   
| id| 
+---+ 
| 1| 
| 2| 
| 3| 
| 4| 
| 8| 
| 9| 
+---+ 

// select * from df where id is in the values to exclude 
scala> df.filter(col("id").isin(f : _*)).show 

这里是not isin的RDD版本:

scala> val rdd = sc.parallelize(1 to 10) 
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24 

scala> val f = Seq(5,6,7) 
f: Seq[Int] = List(5, 6, 7) 

scala> val rdd2 = rdd.filter(x => !f.contains(x)) 
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at <console>:28 

尽管如此,我仍然认为这是一个矫枉过正,因为你已经使用spark-sql

在你的情况下,你实际上正在处理数据帧,因此上述解决方案不起作用。

您可以使用左防加入方法:

scala> val source = spark.read.format("csv").load("source.file") 
source: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields] 

scala> val destination = spark.read.format("csv").load("destination.file") 
destination: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields] 

scala> source.show 
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+ 
|_c0|    _c1|  _c2|  _c3|   _c4|_c5|_c6|  _c7| _c8|  _c9|  _c10| 
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+ 
| 1|  Ravi kumar| Ravi |  kumar|   MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama | 
| 2|Shekhar shudhanshu| Shekhar|shudhanshu|  Manulife | 2| M|18-01-1994|76.34| 250000|  Alaska | 
| 3|Preethi Narasingam| Preethi|Narasingam|  Retail | 3| F|19-01-1994|77.45|270000.01| Arizona | 
| 4|  Abhishek Nair|Abhishek|  Nair|  Banking | 4| M|20-01-1994|78.65| 345000| Arkansas | 
| 5|  Ram Sharma|  Ram| Sharma|Infrastructure | 5| M|21-01-1994|79.12| 45000| California | 
| 6| Chandani Kumari|Chandani| Kumari|   BNFS | 6| F|22-01-1994|80.13| 43000.02| Colorado | 
| 7|  Balaji Kumar| Balaji|  Kumar|   MSO | 1| M|23-01-1994|81.33| 1234678|Connecticut | 
| 8| Naveen Shekrappa| Naveen| Shekrappa|  Manulife | 2| M|24-01-1994| 100| 789414| Delaware | 
| 9|  Milind Chavan| Milind| Chavan|  Retail | 3| M|25-01-1994|83.66| 245555| Florida | 
| 10|  Raghu Rajeev| Raghu| Rajeev|  Banking | 4| M|26-01-1994|87.65| 235468|  Georgia| 
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+ 


scala> destination.show 
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+ 
|_c0|    _c1|  _c2|  _c3|   _c4|_c5|_c6|  _c7| _c8|  _c9|  _c10| 
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+ 
| 1|   Ravi kumar| Revi |  kumar|   MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama | 
| 1|  Ravi1 kumar| Revi |  kumar|   MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama | 
| 1|  Ravi2 kumar| Revi |  kumar|   MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama | 
| 2| Shekhar shudhanshu| Shekhar|shudhanshu|  Manulife | 2| M|18-01-1994|76.34| 250000|  Alaska | 
| 3|Preethi Narasingam1| Preethi|Narasingam|  Retail | 3| F|19-01-1994|77.45|270000.01| Arizona | 
| 4|  Abhishek Nair1|Abhishek|  Nair|  Banking | 4| M|20-01-1994|78.65| 345000| Arkansas | 
| 5|   Ram Sharma|  Ram| Sharma|Infrastructure | 5| M|21-01-1994|79.12| 45000| California | 
| 6| Chandani Kumari|Chandani| Kumari|   BNFS | 6| F|22-01-1994|80.13| 43000.02| Colorado | 
| 7|  Balaji Kumar| Balaji|  Kumar|   MSO | 1| M|23-01-1994|81.33| 1234678|Connecticut | 
| 8| Naveen Shekrappa| Naveen| Shekrappa|  Manulife | 2| M|24-01-1994| 100| 789414| Delaware | 
| 9|  Milind Chavan| Milind| Chavan|  Retail | 3| M|25-01-1994|83.66| 245555| Florida | 
| 10|  Raghu Rajeev| Raghu| Rajeev|  Banking | 4| M|26-01-1994|87.65| 235468|  Georgia| 
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+ 

你只需要做到以下几点:

scala> val res1 = source.join(destination, Seq("_c0"), "leftanti") 

scala> val res2 = destination.join(source, Seq("_c0"), "leftanti") 

这是我在我的答案here提到相同的逻辑。

+0

@eliasha,我使用RDD不是数据框 – Vignesh

+0

正在寻找类似val的东西extra_in_source = source_primary_key.filter(rec!= destination_primary_key._1) – Vignesh

+0

我是如果可以使用DataFrame的话,仍然不能确信。让我更新我的答案无论如何 – eliasah