2017-06-29 53 views
0

初学者在这里,我正在使用Spark 2.1.1和Scala 2.11.8。如何将一列RDD与(a)相同(b)不同RDD的其他列进行映射?

我有一个六列的RDD。这是RDD的第一个条目: -

(String, String, String, String, String, String) = (" p69465323_serv80i"," 7 "," fb_406423006398063"," guest_861067032060185_android"," fb_100000829486587"," fb_100007900293502") 

实际的RDD有超过500万条目。

我想的第一列以这样的方式,第三,第四,第五和第六列,我得到这样的个别地图: -

(fb_406423006398063, p69465323_serv80i) 
(guest_861067032060185_android, p69465323_serv80i) 
(fb_100000829486587, p69465323_serv80i) 
(fb_100007900293502, p69465323_serv80i) 

即第一列与第三单独映射,第四,第五和第六列。我该怎么做(a)在相同的RDD中(b)在不同的RDD中?

回答

2

考虑到你有元组的数组,其中每个元素是这样的:

(" p69465323_serv80i"," 7 "," fb_406423006398063"," guest_861067032060185_android"," fb_100000829486587"," fb_100007900293502") 

您可以使用以下方法:

val rdd = sc.parallelize(Array((" p69465323_serv80i"," 7 "," fb_406423006398063"," guest_861067032060185_android"," fb_100000829486587"," fb_100007900293502"))) 
val pairedRdd = rdd.map(x => ((x._1, x._3), (x._1, x._4), (x._1, x._5), (x._1, x._6))) 
pairedRdd.collect 
Array[((String, String), (String, String), (String, String), (String, String))] = Array(((" p69465323_serv80i"," fb_406423006398063"),(" p69465323_serv80i"," guest_861067032060185_android"),(" p69465323_serv80i"," fb_100000829486587"),(" p69465323_serv80i"," fb_100007900293502"))) 
+0

谢谢!但我在这里有一个问题,同样的解决方案是否适用于两种不同的RDD?这是关于我今天早些时候发布的问题:https://stackoverflow.com/questions/44819655/how-to-perform-set-transformations-on-rdds-with-different-number-of-columns – PixieDev

+0

@AviAggarwal你的意思是从一个rdd到另一个rdd的地图列?不,你不能这样做。这个问题提供的解决方案看起来不错。正如那里所建议的那样,你的RDD需要是同一类型的。 – philantrovert

0
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import org.apache.spark.sql.Encoder 
import spark.implicits._ 

//val yourRDD = (" p69465323_serv80i"," 7 "," fb_406423006398063"," guest_861067032060185_android"," fb_100000829486587"," fb_100007900293502") 
val newDF = yourRDD 
    .map(_.split(",")) 
    .map(attributes => YourModelClass(attributes(0),attributes(1), 
    attributes(2), attributes(3), attributes(4))) 
    .toDF() 
// Register the DataFrame as a temporary view 
newDF.createOrReplaceTempView("DFTable") 

val firstDF = spark.sql("SELECT secondCol, firstCol FROM DFTable") 
val secondDF = spark.sql("SELECT thirdCol, firstCol FROM DFTable") 
// val thirdDF = .... etc 

声明YourModelClass对象或类变量:firstCol,secondCol,... fiftCol。

我希望能够帮助您

相关问题