2017-04-10 152 views
0

我有一个包含这样Scala - 如何迭代RDD上的元组?

元组的RDD(A,列表(-2,5,6,7-))

(B,列表(2,8,9,10))

我想获得第一个元素的索引,其中值和索引之间的特定条件成立。 到目前为止,我已经试过这对一个元组测试,它工作得很好:

test._2.zipWithIndex.indexWhere { case (v, i) => SOME_CONDITION} 

我只是找不到如何遍历列表中的所有元组。我曾尝试:

val result= test._._2.zipWithIndex.indexWhere { case (v, i) => SOME_CONDITION} 

回答

4

首先,“迭代”在这里是一个错误的概念 - 它来自命令式编程的领域,您实际上在自己的数据结构上迭代。 Spark使用功能范例,让您通过函数来处理RDD中的每个记录(使用一些高阶函数,如map,foreach ...)。

在这种情况下,听起来像是想将中的每个元素都放到一个新的元素中。

仅仅映射你的元组的右手边(不改变左侧),你可以使用mapValues

// mapValues will map the "values" (of type List[Int]) to new values (of type Int) 
rdd.mapValues(list => list.zipWithIndex.indexWhere { 
    case (v, i) => someCondition(v, i) 
}) 

,或者,使用普通map

rdd.map { 
    case (key, list) => (key, list.zipWithIndex.indexWhere { 
    case (v, i) => someCondition(v, i) 
    }) 
} 
+0

感谢您的回应!我得到这个错误'价值indexWhere不是Iterable成员[(Int,Int)]' – lacrima

+0

我假设你的输入RDD的类型是'RDD [(String,List [Int])]] - 是不是案件?如果是这样 - 它是什么? –

+0

我会说这是,但我可能是错误的。名单成员来自wordcount。有没有办法检查对象的类型?或者明确定义它的类型? – lacrima