2017-03-18 55 views
0

的补体我有两个RDD的: 第一个(用户ID,MOV ID,评分,时间戳)加入RDD的以导致相交

data_wo_header: RDD[String] 
scala> data_wo_header.take(5).foreach(println) 
1,2,3.5,1112486027 
1,29,3.5,1112484676 
1,32,3.5,1112484819 
1,47,3.5,1112484727 
1,50,3.5,1112484580 

和RDD2(用户ID,MOV ID)

data_test_wo_header: RDD[String] 
scala> data_test_wo_header.take(5).foreach(println) 
1,2 
1,367 
1,1009 
1,1525 
1,1750 

我需要加入两个RDD,这样加入会删除RDD1中常见的条目(UserID,Mov ID)。 有人可以指导两个RDD的scala-spark连接。 另外,我需要一个连接,其中从RDD1派生的新RDD只有公共项目。

回答

0

首先转换您的RDDS到数据帧数据帧,因为有类似的API常用的SQL如加入,选择等

要将RDDS转换成数据帧,你需要一个RDD [行]代替RDD [字符串。

Import sqlContext.implicits._ 

case class cs1(UserID: Int, MovID: Int, Rating: String, Timestamp: String) 

case class cs2(UserID: Int, MovID: Int) 

val df1 = data_wo_header.map(row => { 
    val splits = row.split(",") 

    cs1(splits(0).toInt, splits(1).toInt, splits(2),splits(3)) 
}).toDF("UserID", "MovID", "Rating", "Timestamp") 

val df2 = data_test_wo_header.map(row => { 
    val splits = row.split(",") 

    cs2(splits(0).toInt, splits(1).toInt) 
}).toDF("UserID", "MovID") 

现在,添加一个新列DF2,

val df2Prime = df2.withColumn("isPresent", lit(1)) 

然后左加入df2Prime与DF1,并过滤掉行,其中isPresent是1,你有相交的结果。另外,临时丢弃isPresent标志。

val temp = df1.join(df2Prime, usingColumns = Seq("UserID", "MovID"), "left") 

temp.filter(temp("isPresent") =!= "1").drop("isPresent") 
0

超级简单的方法是使用键减法。以下是为我工作:

val data_wo_header=dropheader(data).map(_.split(",")).map(x=>((x(0),x(1)),(x(2),x(3)))) 
val data_test_wo_header=dropheader(data_test).map(_.split(",")).map(x=>((x(0),x(1)),1)) 
val ratings_train=data_wo_header.subtractByKey(data_test_wo_header) 
val ratings_test=data_wo_header.subtractByKey(ratings_train)