2015-04-02 47 views
2

在我的Spark应用程序中,我使用了一个数据量很大的JavaPairRDD<Integer, List<Tuple3<String, String, String>>>如何根据密钥从PairRDD获取新的RDD

而我的要求是,我需要一些基于密钥的Large PairRDD的其他RDDs JavaRDD<Tuple3<String, String, String>>

+0

为什么不只是过滤base rdd? – Krever 2015-04-02 10:27:16

+0

使用java.util.stream.Stream过滤数据。请看看[链接](http://www.journaldev.com/2389/java-8-features-for-developers-lambdas-functional-interface-stream-and-time-api#java-stream-api) – 2015-04-02 10:31:39

+0

在PairRDD中,我使用具有数百万个Tuple3 的List。但是根据Tuple的第三个参数,我只需要该列表中的50个排序记录。 所以对此,我只是想创建一些新的Rdds,而不是在排序后有元组。 如果有其他方法,请告诉我。 – Kaushal 2015-04-02 10:37:11

回答

3

我不知道Java的API,但这里是你会怎么做,在斯卡拉(在spark-shell):

def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = { 
    rdd.keys.distinct.collect.map { 
    key => key -> rdd.filter(_._1 == key).values.flatMap(identity) 
    } 
} 

你必须filter每个键与flatMap扁平化List秒。我不得不提的是,这不是一个有用的操作。如果您能够构建原始RDD,那意味着每个List都足够小以适应内存。所以我不明白你为什么想把它们变成RDD。

+0

也许我误解了这个问题?让我知道。 – 2015-04-02 12:13:16