2015-02-09 149 views
12

我使用java的spark,并且我拥有500万行RDD。是否有一种溶剂可以让我计算RDD的行数。我试过RDD.count(),但这需要很长时间。我已经看到,我可以使用功能fold。但是我没有找到这个函数的java文档。 请你告诉我如何使用它或向我展示另一种解决方案来获取RDD的行数。计算RDD中的行数

这里是我的代码:

JavaPairRDD<String, String> lines = getAllCustomers(sc).cache(); 
    JavaPairRDD<String,String> CFIDNotNull = lines.filter(notNull()).cache(); 
    JavaPairRDD<String, Tuple2<String, String>> join =lines.join(CFIDNotNull).cache(); 


    double count_ctid = (double)join.count(); // i want to get the count of these three RDD 
    double all = (double)lines.count(); 
    double count_cfid = all - CFIDNotNull.count(); 
    System.out.println("********** :"+count_cfid*100/all +"% and now : "+ count_ctid*100/all+"%"); 

谢谢。

回答

42

你当时的想法是:使用rdd.count()计算行数。没有更快的方法。

我想你应该问的问题是为什么rdd.count()这么慢?

答案是rdd.count()是一个“动作”—它是一个急切的操作,因为它必须返回一个实际的数字。您在count()之前执行的RDD操作是“转换”—他们将RDD转换为另一种懒惰。实际上,这些转换并没有实际执行,只是排队。当您致电count()时,您强制执行所有先前的懒惰操作。现在需要加载输入文件,执行map() s和filter(),执行洗牌等,直到最终获得数据并可以说明它有多少行。

请注意,如果您拨打count()两次,所有这一切将发生两次。计数返回后,所有数据都将被丢弃!如果您想避免这种情况,请在RDD上拨打cache()。然后第二个电话count()将是快速的,也派生的RDD将更快计算。但是,在这种情况下,RDD必须存储在内存(或磁盘)中。

7

丹尼尔对count的解释是正确的。但是,如果您愿意接受近似值,则可以尝试countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] RDD方法。 (但请注意,这被标记为“实验”)。