2017-10-12 125 views
0

我有具有以下结构的RDD:
((ByteArray, Idx), ((srcIdx,srcAdress), (destIdx,destAddress)))如何创建上的RDD(斯卡拉)嵌套for循环

此比特币blockchain的边缘(事务)的表示。 (ByteArray, Idx)可以看作是一个标识符,其余的是一个边缘。我的最终目标是聚合区块链图形表示中的节点。对此我需要做的第一次修改是将同一个比特币交易中的资源放在一个边缘(最终在一个节点中)。通过这种方式,我将“群集”属于同一用户的公钥。 此修改的结果将具有以下结构:
((ByteArray, Idx), (List((srcIdx, srcAddress)), (destIdx, destAddress)))
或者以任何其他形式具有相同的功能(例如,如果这在Scala中是不可能的或逻辑的)。

我目前的思维过程如下。在Java中,我会对RDD中的项目执行嵌套for循环,每个循环都为具有相同密钥的项目创建列表((ByteArray, Idx))。删除任何重复项后。 但是,由于我正在处理RDD和Scala,所以这是不可能的。接下来,我尝试在我的RDD上执行.collect(),然后单独使用.map()函数,并使用集合在我的映射函数中循环。但是,Spark不喜欢这样,因为显然集合不能被序列化。 接着我试图创建一个“嵌套”地图功能如下:

val aggregatedTransactions = joinedTransactions.map(f => { 
    var list = List[Any](f._2._1) 

    val filtered = joinedTransactions.filter(t => f._1 == t._1) 

    for(i <- filtered){ 
    list ::= i._2._1 
    } 

    (f._1, list, f._2._2) 
}) 

这是不允许的,因为该过滤器(或映射)的功能是不可用的.MAP()。有什么选择?

我对Scala相当陌生,所以任何有用的背景信息都非常感谢。

+0

我认为,鉴于你的问题的性质,这将是有益的,提供输入+输出的例子,以避免误解 – dk14

回答

5

我的最终目标是在区块链的图形表示中聚合节点。对此我需要做的第一次修改是将同一个比特币交易中的资源放在一个边缘(最终在一个节点中)。

所以基本上你想groupByKey

joinedTransactions.groupByKey().map { 
    // process data to get desired shape 
} 
-1

嵌套RDDS是不可能的。然而RDD内藏品是 可能。

嵌套for循环可以使用cartesian

DEF笛卡尔[U](其他:RDD [U])(隐式为arg0:ClassTag [U]):RDD [(T, U) ]永久链接返回此RDD的笛卡尔乘积和另一个 之一,即所有元素对(a,b)的RDD,其中a在 this中,而b在另一个中。

val nestedForRDD = rdd1.cartesian(rdd2) 

nestedForRDD.map((rdd1TypeVal, rdd2TypeVal) => { 
    //Do your inner-nested evaluation code here 
}) 

使用星火SQL还可以实现它。

http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/