2015-07-21 257 views
1

目的:Hadoop的,成功的Map Reduce工作,但没有输出

我想使用的Map Reduce合并数据。我在同一个文件夹中有多组数据。

方法:

所以我跑的Map Reduce顺序内的程序/流量多次合并工作。

问题:

,我面对的不是失败的任务,但成功的工作,没有输出的问题。第一个(有时是两个)迭代总是有输出(part-r-00000),但不包括以下内容。我使用的样本数据集,这是非常小的尺寸和体积(1〜2千,约5个文件)

我的尝试:

使得线程每次运行后,但到5秒入睡徒劳无功。我试着在使用webhdfs之后检查了很长一段时间,仍然没有这样的文件。

请问您能否给我启发一下?提前致谢。

图片:

Problem

代码:

/* 
* To change this license header, choose License Headers in Project Properties. 
* To change this template file, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package mergedata; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 

import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

/** 
* 
* @author abcdefg 
*/ 
public class MergeData extends Configured implements Tool{ 

/** 
* @param args the command line arguments 
*/ 
public static class ReadMapper 
extends Mapper<Object, Text, Text, IntWritable>{ 
    @Override 
    public void map(Object key, Text value, Mapper.Context context 
    ) throws IOException, InterruptedException { 

     context.write(new Text(value.toString()), new IntWritable(1)); 
    } 
} 

public static class MergeReducer 
extends Reducer<Text,IntWritable,Text,IntWritable> { 
    private IntWritable result = new IntWritable(); 

    public void reduce(Text key, Iterable<IntWritable> values, 
      Reducer.Context context 
    ) throws IOException, InterruptedException { 
     int sum = 0; 
     for (IntWritable val : values) { 
      sum += val.get(); 
     } 
     result.set(sum); 
     context.write(key, result); 
    } 
} 

@Override 
public int run(String[] args) throws Exception { 

    Configuration conf = getConf(); 

    FileSystem hdfs = FileSystem.get(conf); 

    args = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (args.length != 3) { 
     System.err.println(args.length); 
     System.err.println("Usage: mergedata <input folder> <temporary folder> <output folder>"); 
     System.exit(1); 
    } 
//  FileSystem fs = FileSystem.get(conf); 
//  ContentSummary cs = fs.getContentSummary(new Path(args[0])); 
//  long fileCount = cs.getFileCount(); 

    Job job = Job.getInstance(conf); 

    job.setJarByClass(MergeData.class); 
    job.setMapperClass(ReadMapper.class); 
    job.setReducerClass(MergeReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
//  String files =().replaceAll(",", "," + args[0] + "/"); 
//  FileInputFormat.addInputPaths(job, files); 

    int jobComplete = 1; 
    FileStatus[] fileStatus = hdfs.listStatus(new Path(args[0])); 

    HashMap<String,Pair<String,Long>> map = new HashMap<String,Pair<String,Long>>(); 

    String tempName; 
    String tempKey; 
    Path tempPath; 
    for (FileStatus fileStatu : fileStatus) { 

     tempPath = fileStatu.getPath(); 
     tempName = tempPath.getName(); 
     tempKey = tempName.substring(0,12); 
     if (map.containsKey(tempKey)) { 
      map.put(tempKey,new Pair(map.get(tempKey).getLeft() + "," + 
        tempPath.toString(), 
        map.get(tempKey).getRight() + fileStatu.getLen())); 
     } else { 
      map.put(tempKey, new Pair(tempPath.toString(),fileStatu.getLen())); 
     } 
    } 

    String[] files = map.keySet().toArray(new String[map.keySet().size()]); 
    String[] inputFiles; 
//  String[] files = args[1].split(","); 
    for (String file : files) 
    { 
     System.out.println("file = " + file); 
//   FileInputFormat.addInputPath(job, new Path(args[0] + "/" + file + "*")); 
     System.out.println(args[2] + "/" + file); 
     if (hdfs.exists(new Path(args[2] + "/" + file))) { 
      System.out.println(file + " exists in " + args[2]); 
      map.put(file,new Pair(
        map.get(file).getLeft() + "," + args[2] + "/" + file, 
        map.get(file).getRight() + hdfs.getFileStatus(new Path(args[2] + "/" + file)).getLen() 
      )); 
     } 
     System.out.println("MR job input files : " + map.get(file).getLeft()); 
     FileInputFormat.setInputPaths(job, map.get(file).getLeft()); 

     System.out.println("MR job output dir : " + args[1] + "/" + file); 
     FileOutputFormat.setOutputPath(job ,new Path(args[1] + "/" + file)); 
     if (hdfs.exists(new Path(args[1] + "/" + file))) { 
      hdfs.delete(new Path(args[1] + "/" + file), true); // Shouldn't occur 
     } 
     jobComplete = Math.max(jobComplete, (job.waitForCompletion(true))? 0 : 1); 
      // hdfs.getFileStatus(tempFile) 
     if (job.isSuccessful()) { 
       // Following sequence includes size check before deleting files 

      FileStatus[] filesStatuz = hdfs.listStatus(new Path(args[1] + "/" + file + "/part-r-00000")); 

      System.out.println("filesStatuz[0].getLen() = " + filesStatuz[0].getLen()); 
      System.out.println("totalLen = " + map.get(file).getRight()); 
      if (filesStatuz[0].getLen() >= map.get(file).getRight()) { 

       if (hdfs.exists(new Path(args[2] + "/" + file))) { 
        System.out.println("Found the main file of " + file); 
        hdfs.rename(new Path(args[2] + "/" + file), new Path(args[2] + "/" + file + "_tmp")); 
       } 
       hdfs.rename(new Path(args[1] + "/" + file + "/part-r-00000"), new Path(args[2] + "/" + file)); 
       hdfs.delete(new Path(args[1] + "/" + file), true); 
       System.out.println("Done safe replacement"); 

//     hdfs.delete(new Path(args[0] + "/" + file + "*"), false); 
       inputFiles = map.get(file).getLeft().split(","); 
       for (String inputFile : inputFiles) { 
        if (!inputFile.equals(args[2] + "/" + file)) { 
         hdfs.delete(new Path(inputFile), false); 
         System.out.println(inputFile + " has been deleted"); 
        } 
       } 
       if (hdfs.exists(new Path(args[2] + "/" + file + "_tmp"))) { 
        hdfs.delete(new Path(args[2] + "/" + file + "_tmp"), false); 
        System.out.println("Deleted previous main file of " + file); 
       } 
      } 
      else { 
       System.out.println("Merging of " + file +"might have failed. Input and output size doesn't tally"); 
      } 
     }   
    } 
    return(jobComplete); 
} 

public static void main(String[] args) throws Exception { 
    // TODO code application logic here 
    int exitCode = ToolRunner.run(new MergeData(), args); 
    System.exit(exitCode); 
} 

public class Pair<L,R> { 

    private final L left; 
    private final R right; 

    public Pair(L left, R right) { 
     this.left = left; 
     this.right = right; 
    } 
    public L getLeft() { return left; } 
    public R getRight() { return right; } 

    @Override 
    public int hashCode() { return left.hashCode()^right.hashCode(); } 

    @Override 
    public boolean equals(Object o) { 
     if (!(o instanceof Pair)) return false; 
     Pair pairo = (Pair) o; 
     return this.left.equals(pairo.getLeft()) && 
       this.right.equals(pairo.getRight()); 
    } 

} 
} 

流量:

的本质是,它会结合类似日期例如文件:cdr_201输入文件夹(args [0])中的50701_0,cdr_20150701_1添加到主文件(例如cdr_20150701)中并放入合并文件夹(args [3])。但是如果在合并之前存在这样的主文件,则所有文件例如:cdr_20150701_0,cdr_20150701_1和cdr_20150701将被合并为新的cdr_20150701。 part-0-00000将存储在一个临时文件夹(args [1])中。成功传输后,临时文件夹和部件将被删除。

+0

似乎输入目录/文件不存在... – vefthym

+1

你可以发布你想运行作业的代码?这不是一个成功的工作,它有一个FileNotFoundException,这显然是问题。 – fd8s0

+0

只是Driver类应该足够启动...我想这只是没有设置第二个工作的输入路径作为第一个输出路径的问题... – vefthym

回答

1

你是否尝试使用getmerge命令,也许它可以在你的情况下有用。如果您只对数据进行合并,则可能不需要仅用于合并的缩减地图作业。

hadoop的FS -getmerge [addnl]

取得一个源目录和目的地文件作为输入并连接在SRC文件到目的地的本地文件。可选地,可以将addnl设置为在每个文件的末尾添加换行符。

http://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-common/FileSystemShell.html

+0

我已经采取了你的建议,它现在像一个魅力,在Java中称为copyMerge。谢谢 – FailedMathematician