2013-03-25 56 views
6

重写RecordReader类的方法“下一步”和TextInputFormat类的“getRecordReader”以便发送整个段落到映射器而不是逐行。 (我用旧的API和认定中对我的款追加至一个空行来在我的文本文件的时间。)
下面是我的代码:覆盖RecordReader一次而不是行

public class NLinesInputFormat extends TextInputFormat 
{ 
    @Override 
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException  { 
     reporter.setStatus(split.toString()); 
     return new ParagraphRecordReader(conf, (FileSplit)split); 
    } 
} 



public class ParagraphRecordReader implements RecordReader<LongWritable, Text> 
{ 
     private LineRecordReader lineRecord; 
     private LongWritable lineKey; 
     private Text lineValue; 
     public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException { 
      lineRecord = new LineRecordReader(conf, split); 
      lineKey = lineRecord.createKey(); 
      lineValue = lineRecord.createValue(); 
     } 

     @Override 
     public void close() throws IOException { 
      lineRecord.close(); 
     } 

     @Override 
     public LongWritable createKey() { 
      return new LongWritable(); 

     } 

     @Override 
     public Text createValue() { 
      return new Text(""); 

     } 

     @Override 
     public float getProgress() throws IOException { 
      return lineRecord.getPos(); 

     } 

     @Override 
     public synchronized boolean next(LongWritable key, Text value) throws IOException { 
      boolean appended, gotsomething; 
      boolean retval; 
      byte space[] = {' '}; 
      value.clear(); 
      gotsomething = false; 
      do { 
       appended = false; 
       retval = lineRecord.next(lineKey, lineValue); 
       if (retval) { 
        if (lineValue.toString().length() > 0) { 
         byte[] rawline = lineValue.getBytes(); 
         int rawlinelen = lineValue.getLength(); 
         value.append(rawline, 0, rawlinelen); 
         value.append(space, 0, 1); 
         appended = true; 
        } 
        gotsomething = true; 
       } 
      } while (appended); 

      //System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]"); 
      return gotsomething; 
     } 

     @Override 
     public long getPos() throws IOException { 
      return lineRecord.getPos(); 
     } 
    } 

问题:
1.我没有找到任何具体的指导如何做到这一点,所以可能是我做错了,请评论任何建议?
2.我能够正确编译,但是当我运行我的作业时,我的映射器不断运行,我无法弄清楚问题出在哪里?

+0

您是否尝试过仅输入一个段落? – Amar 2013-03-25 09:02:21

+0

我认为你有一个bug;当你穿越分裂时你会得到额外的段落。我认为你需要区分从0开始的分割和其他分割。从0开始的第一行开始一段,但以行开头的分割不应该开始一个新段落。 (通常情况下,你已经读过一个拆分边界,所以如果你的拆分文件有连续段落的行,它们将会被前一个拆分文件所发出)。我错过了什么吗? – 2017-04-15 22:10:56

回答

3

你的代码对我来说工作得很好。 我做的唯一改变是将这些类作为内部类并使它们变为静态。

输入文件如下:

This is awesome. 
WTF is this. 

This is just a test. 

映射器的代码看起来像:

@Override 
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) 
    throws IOException { 

    System.out.println(key+" : "+value); 
} 

而且输出是:

0 : This is awesome. WTF is this. 
0 : This is just a test. 

我相信你会避风港忘记设置输入格式,但为了以防万一,请将其设置为以下内容ws:

conf.setInputFormat(NLinesInputFormat.class); 
+0

感谢您回复Amar!..我使用这些类作为公共静态,并设置了Inputformat,但我没有尝试使用小段落,我正在用一个大文件进行测试。我会这样做,让你知道它是如何发生的。 – JackSparrow 2013-03-25 14:06:38

+0

嘿谢谢人......我检查了短输入文件,它对长文件工作正常这是一些格式问题,我已经弄明白了! – JackSparrow 2013-03-25 17:15:00

+0

@Amar是一个hadoop的初学者,你能解释下一个方法内部发生了什么吗?你能解释我实现的逻辑吗?我在这方面需要一点帮助。 – user1585111 2013-09-02 10:53:19