2013-07-19 52 views
1

我建立了一个基本的网页解析器,它使用hadoop来把URL传递给多个线程。这很好地工作,直到我到达输入文件的末尾,Hadoop会在线程仍在运行时声明自己已完成。这会导致org.apache.hadoop.fs.FSError错误:java.io.IOException:Stream Closed。无论如何要保持这个流线足够长的时间来完成线程吗? (我可以用合理的准确度预测线程在单个url上花费的最大时间量)。如何防止hadoop流关闭?

继承人我是如何执行的线程

public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
     private Text word = new Text(); 
     private URLPile pile = new URLPile(); 
     private MSLiteThread[] Threads = new MSLiteThread[16]; 
     private boolean once = true; 

     @Override 
     public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) { 

      String url = value.toString(); 
      StringTokenizer urls = new StringTokenizer(url); 
      Config.LoggerProvider = LoggerProvider.DISABLED; 
      System.out.println("In Mapper"); 
      if (once) { 
       for (MSLiteThread thread : Threads) { 
        System.out.println("created thread"); 
        thread = new MSLiteThread(pile); 
        thread.start(); 
       } 
       once = false; 
      } 

      while (urls.hasMoreTokens()) { 
       try { 
        word.set(urls.nextToken()); 
        String currenturl = word.toString(); 
        pile.addUrl(currenturl, output); 

       } catch (Exception e) { 
        e.printStackTrace(); 
        continue; 
       } 

      } 

     } 

螺纹自己得到这样

public void run(){ 
      try { 
      sleep(3000); 
       while(!done()){ 
        try { 
        System.out.println("in thread"); 
         MSLiteURL tempURL = pile.getNextURL(); 
         String currenturl = tempURL.getURL(); 
         urlParser.parse(currenturl); 
         urlText.set(""); 
         titleText.set(currenturl+urlParser.export()); 
         System.out.println(urlText.toString()+titleText.toString()); 
         tempURL.getOutput().collect(urlText, titleText); 
         pile.doneParsing(); 
        sleep(30); 
        } catch (Exception e) { 
          pile.doneParsing(); 
        e.printStackTrace(); 
         continue; 
        } 
       } 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      System.out.println("Thread done"); 

     } 

的网址,并在urlpile相关的方法是

public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException { 
     while(queue.size()>16){ 
      System.out.println("queue full"); 
      wait(); 
     } 
     finishedParcing--; 
     queue.add(new MSLiteURL(output,url)); 
     notifyAll(); 
    } 

    private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>(); 
    private int sent = 0; 
    private int finishedParcing = 0; 
    public synchronized MSLiteURL getNextURL() throws InterruptedException { 

     notifyAll(); 
     sent++; 
     //System.out.println(queue.peek()); 
     return queue.remove(); 

    } 

回答

1

正如我可以推断从下面的评论中,你可以在每个map()函数中做到这一点,使事情变得简单。 我看到你做了以下事情,以预先创建一些空闲线程。 可以所以下面的代码移到

if (once) { 
    for (MSLiteThread thread : Threads) { 
    System.out.println("created thread"); 
    thread = new MSLiteThread(pile); 
    thread.start(); 
    } 
once = false; 
} 

到,

public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
    @Override 
    public void configure(JobConf job) { 
     for (MSLiteThread thread : Threads) { 
     System.out.println("created thread"); 
     thread = new MSLiteThread(pile); 
     thread.start(); 
     } 
    } 

    @Override 
    public void map(LongWritable key, Text value, 
     OutputCollector<Text, Text> output, Reporter reporter) { 
    } 

} 

,这可能得到一次初始化,对于这个问题,不需要“一次”条件检查了。

此外,您不需要像上面那样制作空闲线程。 我不知道你会得到多少性能增益创建16个空闲线程。

不管怎么说,这里是一个解决方案(可能不是十全十美)

您可以使用类似的CountDownLatch Read more here处理您的网址或N个批次越来越封锁,直到他们完成。这是因为,如果将每个传入的URL记录释放到一个线程中,下一个URL将被立即取回,并且当您以相同的方式处理最后一个url时,即使您还有线程,map()函数也会返回在队列中进行处理。你将不可避免地得到你提到的例外。

这里举一个例子,说明如何使用倒数计时器阻塞。

public static class Map extends MapReduceBase implements 
       Mapper<LongWritable, Text, Text, Text> { 

      @Override 
      public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) { 

       String url = value.toString(); 
       StringTokenizer urls = new StringTokenizer(url); 
       Config.LoggerProvider = LoggerProvider.DISABLED; 

      //setting countdownlatch to urls.countTokens() to block off that many threads. 
      final CountDownLatch latch = new CountDownLatch(urls.countTokens()); 
      while (urls.hasMoreTokens()) { 
       try { 
        word.set(urls.nextToken()); 
        String currenturl = word.toString(); 
        //create thread and fire for current URL here 
        thread = new URLProcessingThread(currentURL, latch); 
        thread.start(); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        continue; 
       } 

      } 

      latch.await();//wait for 16 threads to complete execution 
      //sleep here for sometime if you wish 

     } 

    } 

最后,URLProcessingThread只要一个URL处理减少锁存计数器,

public class URLProcessingThread implments Runnable { 
    CountDownLatch latch; 
    URL url; 
    public URLProcessingThread(URL url, CountDownLatch latch){ 
     this.latch = latch; 
     this.url = url; 
    } 
    void run() { 
     //process url here 
     //after everything finishes decrement the latch 
     latch.countDown();//reduce count of CountDownLatch by 1 

    } 
} 

与您的代码看出大概问题:pile.addUrl(currenturl, output);,当你添加一个新的URL,在此期间所有16个线程都会得到更新(我不太确定),因为同一个一堆对象被传递给16个线程。有可能你的网址被重新处理,或者你可能会得到一些其他的副作用(我对此不太确定)。

其他建议:

此外,您可能需要使用

mapred.task.timeout

(默认值= 600000ms)= 10分钟

Description: The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string.

您可以添加/覆盖此增加地图任务超时property map in mapred-site.xml

+0

那dec如果它真的超时,这个任务就会失败,这并不是我想要的,但它似乎是正确的。 – Chenab

+1

啊!我可能错过了问题中的一些细节。你是说你有从单个地图任务运行的线程,并且当地图完成处理它的输入时,Hadoop退出了吗? –

+0

或多或少。线程一段时间来处理每个输入,这就是为什么我有更多的一个。然而,一旦hadoop声明地图任务完成,线程就没有放置输出的地方。 – Chenab