2014-12-04 77 views
1

我有成千上万的小文件,我想用combineFileInputFormat处理它们。在combineFileInputFormat中可伸缩不起作用

在combineFileInputFormat中,一个映射器有多个小文件,每个文件都不会被拆分。

像这样小的输入文件之一的片段,

vers,3 
period,2015-01-26-18-12-00,438469546,449329626,complete 
config,libdvm.so,chromeview 
pkgproc,com.futuredial.digitchat,10021,,0ns:10860078 
pkgpss,com.futuredial.digitchat,10021,,0ns:9:6627:6627:6637:5912:5912:5912 
pkgsvc-run,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078 
pkgsvc-start,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078 
pkgproc,com.google.android.youtube,10103,,0ns:10860078 
pkgpss,com.google.android.youtube,10103,,0ns:9:12986:13000:13021:11552:11564:11580 
pkgsvc-  run,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078 
pkgsvc- start,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078 

我想通过整个文件内容映射器。但是,hadoop将文件分割为一半。

例如,上述文件可被划分为

vers,3 
period,2015-01-26-18-12-00,438469546,449329626,complete 
config,libdvm.so,chromeview 
pkgproc,com.futuredial.digitchat,#the line has been cut 

但我想要处理整个文件的内容。

这里是我的代码,它引用Reading file as single record in hadoop

的驱动代码

public class CombineSmallfiles { 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 

    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: conbinesmallfiles <in> <out>"); 
     System.exit(2); 
    } 

    conf.setInt("mapred.min.split.size", 1); 
    conf.setLong("mapred.max.split.size", 26214400); // 25m 
    //conf.setLong("mapred.max.split.size", 134217728); // 128m 

    //conf.setInt("mapred.reduce.tasks", 5); 

    Job job = new Job(conf, "combine smallfiles"); 
    job.setJarByClass(CombineSmallfiles.class); 
    job.setMapperClass(CombineSmallfileMapper.class); 
    //job.setReducerClass(IdentityReducer.class); 
    job.setNumReduceTasks(0); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    MultipleOutputs.addNamedOutput(job,"pkgproc",TextOutputFormat.class,Text.class,Text.class); 
    MultipleOutputs.addNamedOutput(job,"pkgpss",TextOutputFormat.class,Text.class,Text.class); 
    MultipleOutputs.addNamedOutput(job,"pkgsvc",TextOutputFormat.class,Text.class,Text.class); 

    job.setInputFormatClass(CombineSmallfileInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 

    int exitFlag = job.waitForCompletion(true) ? 0 : 1; 
    System.exit(exitFlag); 

} 

} 

我的映射器代码

public class CombineSmallfileMapper extends Mapper<NullWritable, Text, Text, Text> { 

    private Text file = new Text(); 
    private MultipleOutputs mos; 
    private String period; 
    private Long elapsed; 

    @Override 
    public void setup(Context context) throws IOException, InterruptedException { 
     mos = new MultipleOutputs(context); 
    } 
    @Override 
    protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String file_name = context.getConfiguration().get("map.input.file.name"); 
     String [] filename_tokens = file_name.split("_"); 
     String uuid = filename_tokens[0]; 
     String [] datetime_tokens; 
     try{ 
     datetime_tokens = filename_tokens[1].split("-"); 
     }catch(ArrayIndexOutOfBoundsException err){ 
      throw new ArrayIndexOutOfBoundsException(file_name); 
     } 
     String year,month,day,hour,minute,sec,msec; 
     year = datetime_tokens[0]; 
     month = datetime_tokens[1]; 
     day = datetime_tokens[2]; 
     hour = datetime_tokens[3]; 
     minute = datetime_tokens[4]; 
     sec = datetime_tokens[5]; 
     msec = datetime_tokens[6]; 
     String datetime = year+"-"+month+"-"+"-"+day+" "+hour+":"+minute+":"+sec+"."+msec; 
     String content = value.toString(); 
     String []lines = content.split("\n"); 
     for(int u = 0;u<lines.length;u++){ 
      String line = lines[u]; 
      String []tokens = line.split(","); 
      if(tokens[0].equals("period")){ 
       period = tokens[1]; 
       try{ 
       long startTime = Long.valueOf(tokens[2]); 
       long endTime = Long.valueOf(tokens[3]); 
       elapsed = endTime-startTime; 
       }catch(NumberFormatException err){ 
        throw new NumberFormatException(line); 
       } 
      }else if(tokens[0].equals("pkgproc")){ 
       String proc_info = ""; 
       try{ 
       proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3]; 
       }catch(ArrayIndexOutOfBoundsException err){ 
        throw new ArrayIndexOutOfBoundsException("pkgproc: "+content+ "line:"+line); 
       } 
       for(int i = 4;i<tokens.length;i++){ 
        String []state_info = tokens[i].split(":"); 
        String state = ""; 
        state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1]; 
        mos.write("pkgproc",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime)); 
       } 
      }else if(tokens[0].equals("pkgpss")){ 
       String proc_info = ""; 
       proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3]; 
       for(int i = 4;i<tokens.length;i++){ 
        String []state_info = tokens[i].split(":"); 
        String state = ""; 
        state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1]+","+state_info[2]+","+state_info[3]+","+state_info[4]+","+state_info[5]+","+state_info[6]+","+state_info[7]; 
        mos.write("pkgpss",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime)); 
       } 
      }else if(tokens[0].startsWith("pkgsvc")){ 
       String []stateName = tokens[0].split("-"); 
       String proc_info = ""; 
       //tokens[2] = uid, tokens[3] = serviceName 
       proc_info += stateName[1]+','+period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3]; 
       String opcount = tokens[4]; 
       for(int i = 5;i<tokens.length;i++){ 
        String []state_info = tokens[i].split(":"); 
        String state = ""; 
        state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[1]; 
        mos.write("pkgsvc",new Text(tokens[1]), new Text(proc_info+state+','+opcount+','+uuid+','+datetime)); 
       } 
      } 
     } 
    } 

} 

我CombineFileInputFormat,它覆盖isSplitable并返回false

public class CombineSmallfileInputFormat extends CombineFileInputFormat<NullWritable, Text> { 

    @Override 
    public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { 

     return new CombineFileRecordReader<NullWritable,Text>((CombineFileSplit) split,context,WholeFileRecordReader.class); 
    } 
    @Override 
    protected boolean isSplitable(JobContext context,Path file){ 
     return false; 
    } 

} 

的WholeFileRecordReader

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { 
    //private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); 

     /** The path to the file to read. */ 
     private final Path mFileToRead; 
     /** The length of this file. */ 
     private final long mFileLength; 

     /** The Configuration. */ 
     private final Configuration mConf; 

     /** Whether this FileSplit has been processed. */ 
     private boolean mProcessed; 
     /** Single Text to store the file name of the current file. */ 
    // private final Text mFileName; 
     /** Single Text to store the value of this file (the value) when it is read. */ 
     private final Text mFileText; 

     /** 
     * Implementation detail: This constructor is built to be called via 
     * reflection from within CombineFileRecordReader. 
     * 
     * @param fileSplit The CombineFileSplit that this will read from. 
     * @param context The context for this task. 
     * @param pathToProcess The path index from the CombineFileSplit to process in this record. 
     */ 
     public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, 
      Integer pathToProcess) { 
     mProcessed = false; 
     mFileToRead = fileSplit.getPath(pathToProcess); 
     mFileLength = fileSplit.getLength(pathToProcess); 
     mConf = context.getConfiguration(); 
     context.getConfiguration().set("map.input.file.name", mFileToRead.getName()); 

     assert 0 == fileSplit.getOffset(pathToProcess); 
     //if (LOG.isDebugEnabled()) { 
      //LOG.debug("FileToRead is: " + mFileToRead.toString()); 
      //LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); 

      //try { 
      //FileSystem fs = FileSystem.get(mConf); 
      //assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; 
      //} catch (IOException ioe) { 
      //// oh well, I was just testing. 
      //} 
     //} 

     //mFileName = new Text(); 
     mFileText = new Text(); 
     } 

     /** {@inheritDoc} */ 
     @Override 
     public void close() throws IOException { 
     mFileText.clear(); 
     } 

     /** 
     * Returns the absolute path to the current file. 
     * 
     * @return The absolute path to the current file. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public NullWritable getCurrentKey() throws IOException, InterruptedException { 
     return NullWritable.get(); 
     } 

     /** 
     * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), 
     * this returns the contents of the file as a BytesWritable. Otherwise, it returns an 
     * empty BytesWritable.</p> 
     * 
     * <p>Throws an IllegalStateException if initialize() is not called first.</p> 
     * 
     * @return A BytesWritable containing the contents of the file to read. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
     return mFileText; 
     } 

     /** 
     * Returns whether the file has been processed or not. Since only one record 
     * will be generated for a file, progress will be 0.0 if it has not been processed, 
     * and 1.0 if it has. 
     * 
     * @return 0.0 if the file has not been processed. 1.0 if it has. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public float getProgress() throws IOException, InterruptedException { 
     return (mProcessed) ? (float) 1.0 : (float) 0.0; 
     } 

     /** 
     * All of the internal state is already set on instantiation. This is a no-op. 
     * 
     * @param split The InputSplit to read. Unused. 
     * @param context The context for this task. Unused. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // no-op. 
     } 

     /** 
     * <p>If the file has not already been read, this reads it into memory, so that a call 
     * to getCurrentValue() will return the entire contents of this file as Text, 
     * and getCurrentKey() will return the qualified path to this file as Text. Then, returns 
     * true. If it has already been read, then returns false without updating any internal state.</p> 
     * 
     * @return Whether the file was read or not. 
     * @throws IOException if there is an error reading the file. 
     * @throws InterruptedException if there is an error. 
     */ 
     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (!mProcessed) { 
      if (mFileLength > (long) Integer.MAX_VALUE) { 
      throw new IOException("File is longer than Integer.MAX_VALUE."); 
      } 
      byte[] contents = new byte[(int) mFileLength]; 

      FileSystem fs = mFileToRead.getFileSystem(mConf); 
      FSDataInputStream in = null; 
      try { 
      // Set the contents of this file. 
      in = fs.open(mFileToRead); 
      IOUtils.readFully(in, contents, 0, contents.length); 
      mFileText.set(contents, 0, contents.length); 

      } finally { 
      IOUtils.closeQuietly(in); 
      } 
      mProcessed = true; 
      return true; 
     } 
     return false; 
     } 

} 

我希望每一个映射来解析多个小文件和每个小文件不能被分割。

但是,上面的代码会剪切(拆分)我的输入文件,并会引发解析错误(因为我的解析器会将行分割为标记)。

在我的概念中,combineFileInputFormat将多个文件收集到一个分割中,并且每个分割将馈送到一个映射器中。因此,一个映射器可以处理多个文件。

在我的代码中,最大输入分割被设置为25MB,所以我认为问题在于combineFileInputFormat将分割输入分割的小文件的最后部分以满足分割大小限制。

但是,我已覆盖isSplitable并返回false,但它仍然分裂小文件。

这样做的正确方法是什么?

我不确定是否可以指定映射器的文件数量,而不是指定输入拆分大小?

回答

0

使用setMaxSplitSize()在构造函数代码的方法,它应该工作, 它最好告诉分割尺寸,

public class CFInputFormat extends CombineFileInputFormat<FileLineWritable, Text> { 
    public CFInputFormat(){ 
    super(); 
    setMaxSplitSize(67108864); // 64 MB, default block size on hadoop 
    } 
    public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{ 
    return new CombineFileRecordReader<FileLineWritable, Text>((CombineFileSplit)split, context, CFRecordReader.class); 
    } 
    @Override 
    protected boolean isSplitable(JobContext context, Path file){ 
    return false; 
    } 
} 
+0

它仍然分裂我的小文件,即使我用setMaxSplitSize(67108864); – 2014-12-11 11:24:59