2016-11-24 70 views
0

我想通过使用下面的嵌套循环(下面)简单地找到匹配来执行连接。作业在较小的数据集上运行良好,但会停留在较大的数据集上。它通过减速器直到它达到99%或98%并且只是挂起。我不知道这是一个内存问题,还是reducer在超过一定数量的记录时就不能处理for循环计算。我想指出的是,如果我忽略了代码的for循环部分,作业就会在更大的数据集上完成。Hadoop Reducer无法处理嵌套循环计算?

public void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
String kjoin=key.toString(); 
String result=""; 
List<String[]> tGrad = new ArrayList<String[]>(); 
List<String[]> tUni = new ArrayList<String[]>(); 
List<String[]> tDep = new ArrayList<String[]>(); 
List<String[]> memOf = new ArrayList<String[]>(); 
List<String[]> subOrg = new ArrayList<String[]>(); 
List<String[]> ungrad = new ArrayList<String[]>(); 
String line=""; 
String source=""; 
int i=0; 

for (Text value : values){ 
line=value.toString(); 
String[] parts=new String[2]; 

source=line.substring(0,line.indexOf(",")); 
// everything before the first comma. 
parts[0]=line.substring(line.indexOf(",")+1,line.lastIndexOf(",")); 
// between first comma and last. 
parts[1]=line.substring(line.lastIndexOf(",")+1); 
// after last comma 

//separate components 

    if (source.equals("tGrad")){ 
     tGrad.add(parts); 


     } else if (source.equals("tUni")) { 
      tUni.add(parts); 


      } else if (source.equals("tDep")){ 
       tDep.add(parts); 

        }else if (source.equals("memOf")){ 
         memOf.add(parts); 

         } else if (source.equals("subOrg")) { 
           subOrg.add(parts); 

           } else if (source.equals("ungrad")){ 
            ungrad.add(parts); 

        }//end if/else 



source=null; 
line=null; 
} // end for loop for iteration over values. 



//join tuples 
for (String[] so: subOrg){ 

    for (String[] mo: memOf){ 
    if (so[0].equals(mo[1])){ 
     for (String[] ug: ungrad){ 
     if (so[1].equals(ug[1]) && mo[0].equals(ug[0])){ 
      for (String[] tu: tUni){ 
       if (ug[1].equals(tu[0])){ 
       for (String[] td: tDep){ 
        if (mo[1].equals(td[0])){ 
        for (String[] tg: tGrad){ 
        if (mo[0].equals(tg[0])){ 



    result="f("+td[0]+","+mo[0]+","+mo[1]+","+so[0]+","+so[1]+","+tu[0]+","+ug[0]+ug[1]+tg[0]+","+")"; 
    context.write(NullWritable.get(),new Text(result));/
        } //end 1st if 
       } // end 2nd if 

      } // end for 
     } // end for 
    } //end for 

    } 
} 
} 
    }}  
    }//end method 
}//end class 

回答

0

这不是由于内存或减速的限制,但它由于你正在做computation.It将是很好的方式,如果你使用的地图减少,而不是做所有的计算单机内部定义你的加入计算。如果你在独立代码中运行相同的计算,你会看到相同的问题。请通过这个link了解如何在mapreduce中加入

+0

thanx的答案,你的权利。但是我正在执行多路连接,我宁愿它在单个reducer中处理,否则我需要将其分解为几个中间双向连接,因此需要多个作业。 – zaranaid