2017-03-09 90 views
0

我有一个带Integer键和Integer []值的PairRDD rdd1spark - 如何在另一个RDD的转换内查找(Java)PairRDD的键和值

我也有另一个PairRDD rdd2与Integer键和Double值。

键中的每个整数AND值rdd1也作为键存在于rdd2中。

我想为rdd1每对(x, [y1,y2,...,yn])得到x的双重价值的每个整数y1y2,...,yn所有的双重价值。

我试图收集rdd2作为Map<Integer,Double>map2),但它不适合在内存中,我得到OOM错误。我也尝试加入rdds,但我无法弄清楚如何加入密钥和值。 的lookup()方法使用rdd1是不允许的。

的我想要的伪代码如下:

map each (int x, int[] y) in rdd1 to: 
     (x, map2.get(x) + sum(map2.get(yi))) 

每个yiy

我使用Java,但我想在Java和Scala中都存在同样的问题。

回答

1

根据您想要对丢失的匹配所做的操作(rdd1中存在索引并且rdd2中没有相应索引的情况),查询类似于以下内容。

rdd1. 
    // (x, [ y1, ..., yn ]) -> (x, x), (y1, x), ..., (yn, x) 
    flatMap { case (x, ys) => (x :: ys).map((_, x)) }. 
    // (xory, x) -> (xory, (x, rdd2.lookup(xory))) 
    leftOuterJoin(rdd2). 
    // (xory, (x, rdd2.lookup(xory))) -> (x, rdd2.lookup(xory)) 
    map(_._2). 
    // (x, rdd2.lookup(x)), ... -> (x, rdd2.lookup(x) + sum_i(rdd2.lookup(y_i)) 
    reduceByKey{ case (dopt1, dopt2) => (dopt1 ++ dopt2).reduceOption(_ + _) }. 
    // unwrap the option types 
    mapValues(_.getOrElse(0.0)) 
-1
HashMap<Integer, List<Integer>> map = new HashMap<>(); 
    map.put(1,asList(2,3)); 
    map.put(3,asList(4,5)); 

    System.out.println(
      map.entrySet().stream() 
        .flatMap(kv -> 
          Stream.concat(
            Stream.of((double)kv.getKey()), 
            kv.getValue().stream().mapToDouble(x -> Double.valueOf((double)x)).boxed()) 
        ) 
        .collect(Collectors.toList()) 
      ); 

这个怎么样? ...应该在一个RDD中为您提供所有(键和值),您可以在第二个RDD中将它们用作键。你当然可以改变类型。

相关问题