在我的Spark应用程序中,我使用了一个数据量很大的JavaPairRDD<Integer, List<Tuple3<String, String, String>>>
。如何根据密钥从PairRDD获取新的RDD
而我的要求是,我需要一些基于密钥的Large PairRDD的其他RDDs JavaRDD<Tuple3<String, String, String>>
。
在我的Spark应用程序中,我使用了一个数据量很大的JavaPairRDD<Integer, List<Tuple3<String, String, String>>>
。如何根据密钥从PairRDD获取新的RDD
而我的要求是,我需要一些基于密钥的Large PairRDD的其他RDDs JavaRDD<Tuple3<String, String, String>>
。
我不知道Java的API,但这里是你会怎么做,在斯卡拉(在spark-shell
):
def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = {
rdd.keys.distinct.collect.map {
key => key -> rdd.filter(_._1 == key).values.flatMap(identity)
}
}
你必须filter
每个键与flatMap
扁平化List
秒。我不得不提的是,这不是一个有用的操作。如果您能够构建原始RDD,那意味着每个List
都足够小以适应内存。所以我不明白你为什么想把它们变成RDD。
也许我误解了这个问题?让我知道。 – 2015-04-02 12:13:16
为什么不只是过滤base rdd? – Krever 2015-04-02 10:27:16
使用java.util.stream.Stream过滤数据。请看看[链接](http://www.journaldev.com/2389/java-8-features-for-developers-lambdas-functional-interface-stream-and-time-api#java-stream-api) – 2015-04-02 10:31:39
在PairRDD中,我使用具有数百万个Tuple3的List。但是根据Tuple的第三个参数,我只需要该列表中的50个排序记录。 所以对此,我只是想创建一些新的Rdds,而不是在排序后有元组。 如果有其他方法,请告诉我。 –
Kaushal
2015-04-02 10:37:11