2017-03-08 115 views
1

我已经写了一个火花的工作。看起来像下面这样:火花正在洗牌大量数据

public class TestClass { 

public static void main(String[] args){ 
String masterIp = args[0]; 
String appName = args[1]; 
String inputFile = args[2]; 
String output = args[3]; 
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName); 
JavaSparkContext sparkContext = new JavaSparkContext(conf); 
JavaRDD<String> rdd = sparkContext.textFile(inputFile); 
Integer[] keyColumns = new Integer[] {0,1,2}; 
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns); 

Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1); 
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2; 
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2; 

JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() { 
     private static final long serialVersionUID = -6293440291696487370L; 
     @Override 
     public Tuple2<String, Integer> call(String t) throws Exception { 
     String[] record = t.split(","); 
     Integer[] keyColumns = broadcastJob.value(); 
     StringBuilder key = new StringBuilder(); 
     for (int index = 0; index < keyColumns.length; index++) { 
      key.append(record[keyColumns[index]]); 
     } 
     key.append("|id=1"); 
     Integer value = new Integer(record[4]); 
     return new Tuple2<String, Integer>(key.toString(),value); 
     }}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2); 
     pairRDD.saveAsTextFile(output); 
    } 
} 

该程序计算每个键的值的总和。 根据我的理解,本地组合器应该在每个节点上运行,并将相同键的值相加,然后在少量数据的情况下进行混洗。 但在SparkUI上,它显示了大量的随机读取和随机写入(差不多58GB)。 我做错了什么? 如何知道本地组合器是否工作?

群集细节: -
20个节点集群
具有80GB的硬盘来,8GB RAM,4个核每个节点
Hadoop的2.7.2
火花2.0.2(预生成与 - Hadoop的2.7.x分布)

输入文件的详细信息: -
输入文件存储在HDFS
输入文件大小:400GB
多项纪录:16129999990
战绩列:字符串(2 char),int,int,String(2 char),int,int,String(2 char),String(2 char),String(2 char) 在火花日志中,我看到使用localitylevel NODE_LOCAL运行的任务。

enter image description here

+0

你为什么要用combineByKey和reduceByKey? ReduceByKey将使用reducer作为组合器,就像在你的例子中一样 –

回答

0

让我们分解这个问题,看看得到什么。为了简化计算,让我们假设:

  • 记录总数为1.6e8
  • 独特的按键数是1E6
  • 分割大小为128MB(这似乎是与任务在你数UI一致)。

有了这些值,数据将被分成大约3200个分区(在你的情况下为3125个分区)。这给你每个分区大约51200条记录。此外,如果每个密钥的值数量分布均匀,那么每个密钥的平均值应该大约为160个记录。

如果数据是随机分布的(例如,它没有按键排序),那么可以预计平均每个分区每个键的记录数将接近1 *。这基本上是地图边合并根本不会减少数据量的最坏情况。

此外,您必须记住,平面文件的大小通常会显着降低序列化对象的大小。

对于现实生活中的数据,您通常可以期望从数据收集过程中出现某种类型的顺序,所以事情应该比我们上面计算的好,但底线是,如果数据尚未按分区分组,则地图侧组合可能没有提供任何改进。

通过使用更大的拆分(256MB会使您每个分区超过100K),您可以减少混洗数据的数量,但它会带来更长的GC暂停和其他GC问题的代价。


*您可以通过取样与替代模拟这个:

import pandas as pd 
import numpy as np 

(pd 
    .DataFrame({"x": np.random.choice(np.arange(3200), size=160, replace=True)}) 
    .groupby("x") 
    .x.count() 
    .mean()) 

或只是想想的随机分配160个球,3200桶的问题。