2015-11-06 130 views
0

我想知道什么样的Spark运行时采样RDD/DF与完整RDD/DF的运行时相比。我不知道它是否有所作为,但我目前正在使用Java + Spark 1.5.1 + Hadoop 2.6。Spark采样 - 比使用完整RDD/DataFrame速度快多少

JavaRDD<Row> rdd = sc.textFile(HdfsDirectoryPath()).map(new Function<String, Row>() { 
     @Override 
     public Row call(String line) throws Exception { 
      String[] fields = line.split(usedSeparator); 
      GenericRowWithSchema row = new GenericRowWithSchema(fields, schema);//Assum that the schema has 4 integer columns 
      return row; 
      } 
     }); 

DataFrame df = sqlContext.createDataFrame(rdd, schema); 
df.registerTempTable("df"); 
DataFrame selectdf = sqlContext.sql("Select * from df"); 
Row[] res = selectdf.collect(); 

DataFrame sampleddf = sqlContext.createDataFrame(rdd, schema).sample(false, 0.1);// 10% of the original DS 
sampleddf.registerTempTable("sampledf"); 
DataFrame selecteSampledf = sqlContext.sql("Select * from sampledf"); 
res = selecteSampledf.collect(); 

我期望采样速度最佳接近〜90%。但对我来说,它看起来像火花穿过整个DF或做一个计数,基本上几乎与完整DF选择相同。样品生成后,它执行选择。

我是否正确的这个假设或是以错误的方式使用的采样是什么导致我最终得到两个选择所需的相同运行时间?

回答

0

我认为采样速度最佳接近90%。

嗯,有几个原因,这些期望是不现实的:

  • 没有关于数据分布的任何前面的假设,以获得均匀的样品,你必须执行一个完整的数据集扫描。这或多或少会发生什么,当您在Spark中使用sampletakeSample方法时
  • SELECT *是一个相对轻量级的操作。取决于您有足够时间处理单个分区的资源量可以忽略不计
  • 采样不会减少分区数量。如果您不需要​​3210或repartition,则最终可能会有大量几乎为空的分区。这意味着不理想的资源使用情况。
  • ,同时随机数发生器通常是相当有效的产生随机数是不是免费的

有抽样至少有两个重要的好处:

  • 更低的内存使用率,包括垃圾收集
  • 较少的工作较少的数据进行序列化/反序列化并在洗牌或收集的情况下传输

如果您想要从采样中获得最大收益对采样,合并和缓存有意义。

+1

Grea,谢谢你的提示。我真的需要尝试合并,因为我也过滤了几次相同的rdd,这意味着如果我理解你的话,我最终会得到相同大小的rdd。 当我拥有比内存更多的数据时,Cachning有点问题。 还有一个问题。为什么当我将文件读入rdd时不会收集“分布假设”或大小? – user5490570

+0

我的意思是分布是一个统计性质的问题。如果对此有所了解,可以更聪明地进行抽样,特别是如果随机性不是一个硬性要求。例如参见[BlinkDB](http://blinkdb.org/) – zero323