2016-12-15 475 views
2

我有一个相对简单的问题。Spark分割RDD分块和连接

我有一个大的Spark RDD [String](包含JSON)。在我的用例中,我想将N个字符串分组(连接)为一个新的RDD [String],以便它的大小为oldRDD.size/N。

伪例如:

val oldRDD : RDD[String] = ['{"id": 1}', '{"id": 2}', '{"id": 3}', '{"id": 4}'] 

val newRDD : RDD[String] = someTransformation(oldRDD, ",", 2) 
newRDD = ['{"id": 1},{"id": 2}','{"id": 3},{"id": 4}'] 

val anotherRDD : RDD[String] = someTransformation(oldRDD, ",", 3) 
anotherRDD = ['{"id": 1},{"id": 2},{"id": 3}','{"id": 4}'] 

我已经找了一个类似的案件,但无法找到任何东西。

谢谢!

回答

2

在这里你必须使用zipWithIndex函数,然后计算组。

例如,index = 3和n(组数)= 2给出第二组。 3/2 = 1(整数除法),所以基于0的第二组

val n = 3; 
val newRDD1 = oldRDD.zipWithIndex() // creates tuples (element, index) 
    // map to tuple (group, content) 
    .map(x => (x._2/n, x._1)) 
    // merge 
    .reduceByKey(_ + ", " + _) 
    // remove key 
    .map(x => x._2) 

一个注: “zipWithIndex” 的顺序是内部订单。它在业务逻辑中可能没有意义,您必须检查您的情况下订单是否正常。如果不是,请对RDD进行分类,然后使用zipWithIndex

+0

这是一个很好的答案!但是在这种情况下'n'不会是组数;相反,它是组的*大小*。如果你希望'n'是组的*号*,你需要使用模运算符而不是除法,并且注意元素的排序不会被保留。 – vaerek