2010-08-04 50 views
7

我正在尝试编写一个多线程的网络爬虫。Java ThreadPool用法

我的主入口类具有下面的代码:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null) 
     return; 
exec.execute(new URLCrawler(this, url)); 
} 

的URLCrawler获取指定的URL,解析HTML链接提取从它,和时间表看不见的链接回到前沿。

边界是未爬行URL的队列。问题是如何编写get()方法。 如果队列为空,它应该等到任何URLCrawlers完成后再重试。 只有当队列为空且当前没有活动的URLCrawler时,它才应返回null。

我的第一个想法是使用AtomicInteger来计算工作URLCrawlers的当前数量和notifyAll()/ wait()调用的辅助对象。每个爬虫在开始时递增当前工作的URLCrawler的数量,并在退出时递减它,并通知对象它已完成。

但我读了notify()/ notifyAll()和wait()是一些不赞成使用的方法来进行线程通信。

我应该在这个工作模式中使用什么?它与M生产者和N个消费者类似,问题是如何处理生产者的繁琐程度。

回答

1

我认为在这种情况下使用wait/notify是合理的。想不到任何直接的方式来使用j.u.c来做到这一点。
在一个类,姑且称之为协调员:

private final int numOfCrawlers; 
private int waiting; 

public boolean shouldTryAgain(){ 
    synchronized(this){ 
     waiting++; 
     if(waiting>=numOfCrawlers){ 
      //Everybody is waiting, terminate 
      return false; 
     }else{ 
      wait();//spurious wake up is okay 
      //waked up for whatever reason. Try again 
      waiting--; 
      return true; 
     } 
    } 

public void hasEnqueued(){ 
    synchronized(this){ 
     notifyAll(); 
    } 
} 

然后,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); 
while(true){ 
    URL url = frontier.get(); 
    if(url == null){ 
     if(!coordinator.shouldTryAgain()){ 
      //all threads are waiting. No possibility of new jobs. 
      return; 
     }else{ 
      //Possible that there are other jobs. Try again 
      continue; 
     } 
    } 
    exec.execute(new URLCrawler(this, url)); 
}//while(true) 
3

我不知道我理解你的设计,但是这可能是一个Semaphore

3

一种选择一个工作就是让“边疆”阻塞队列,因此,任何线程试图“获取”,从它会阻止。 只要任何其他URLCrawler将对象放入该队列,任何其他线程将自动通知(与对象出队)

+0

是的,这是一个稳定状态的解决方案。但是,如果没有任何URLCrawlers排队任何URL,那么如何处理这种情况呢?对于阻塞队列,边界将无限地阻塞。 – 2010-08-04 05:55:17

+0

在这种情况下,您可以在每次UrlCrawler完成工作时调用的边境对象上有一个crawlerDone()方法。这种方法与您建议的计数器方法一起,您可以测试(在您的边界方法中)是否所有抓取工具都已完成。如果这是真的get()可以返回null而不阻塞 – naikus 2010-08-04 06:07:32

+0

边界可以是一个固定容量的阻塞队列。该容量的一个很好的候选者是数字挖掘者号码 – 2010-08-04 18:12:52

2

我想为您的使用情况下,基本构建块是“锁”,类似于CountDownLatch,但不像CountDownLatch,一个允许递增计数以及。

这种锁的接口可能是

public interface Latch { 
    public void countDown(); 
    public void countUp(); 
    public void await() throws InterruptedException; 
    public int getCount(); 
} 

用于计数的合法值是0以上。 await()方法会让你阻塞,直到计数降到零。

如果你有这样的闩锁,你的用例可以很容易地描述。我也怀疑这个解决方案中的队列(边界)可以被删除(无论如何,执行者提供了一个,所以它有点多余)。我会重写你的主程序为

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers); 
Latch latch = ...; // instantiate a latch 
URL[] initialUrls = ...; 
for (URL url: initialUrls) { 
    executor.execute(new URLCrawler(this, url, latch)); 
} 
// now wait for all crawling tasks to finish 
latch.await(); 

你URLCrawler将使用锁以这种方式:

public class URLCrawler implements Runnable { 
    private final Latch latch; 

    public URLCrawler(..., Latch l) { 
     ... 
     latch = l; 
     latch.countUp(); // increment the count as early as possible 
    } 

    public void run() { 
     try { 
      List<URL> secondaryUrls = crawl(); 
      for (URL url: secondaryUrls) { 
       // submit new tasks directly 
       executor.execute(new URLCrawler(..., latch)); 
      } 
     } finally { 
      // as a last step, decrement the count 
      latch.countDown(); 
     } 
    } 
} 

至于闩实现,可以有许多可能的实现,从一个很基于wait()和notifyAll(),一个使用Lock和Condition的实现,使用AbstractQueuedSynchronizer。我认为所有这些实现都非常简单。请注意,wait() - notifyAll()版本和锁定条件版本将基于互斥,而AQS版本将使用CAS(比较和交换),因此在某些情况下可能会更好地扩展。

+0

你的自定义闩锁看起来非常像信号量...为什么不使用它? – assylias 2012-12-07 09:38:00

+0

是的,肯定有相似之处。从vanilla信号量中缺少的一件事就是await()方法,信号量术语中的await()方法可能会阻塞,直到所有许可证被释放。人们可以通过结合信号量和倒数锁存器来创建这个。 – sjlee 2012-12-07 20:59:16

0

我想推荐一个AdaptiveExecuter。根据特征值,您可以选择序列化或并行化执行的线程。在下面的示例中,PUID是我想用来做出该决定的字符串/对象。您可以更改逻辑以适合您的代码。代码的某些部分被注释以允许进一步的实验。

class AdaptiveExecutor实现执行器{0} {0} {0} {0} Runnable active; // ExecutorService threadExecutor = Executors.newCachedThreadPool(); static ExecutorService threadExecutor = Executors.newFixedThreadPool(4);

AdaptiveExecutor() { 
    System.out.println("Initial Queue Size=" + tasks.size()); 
} 

public void execute(final Runnable r) { 
    /* if immediate start is needed do either of below two 
    new Thread(r).start(); 

    try { 
     threadExecutor.execute(r); 
    } catch(RejectedExecutionException rEE) { 
     System.out.println("Thread Rejected " + new Thread(r).getName()); 
    } 

    */ 


    tasks.offer(r); // otherwise, queue them up 
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel. 
    /* 
    tasks.offer(new Runnable() { 
     public void run() { 
      try { 
       r.run(); 
      } finally { 
       scheduleNext(); 
      } 
     } 
    }); 
    */ 
    if ((active == null)&& !tasks.isEmpty()) { 
     active = tasks.poll(); 
     try { 
      threadExecutor.submit(active); 
     } catch (RejectedExecutionException rEE) { 
      System.out.println("Thread Rejected " + new Thread(r).getName()); 
     } 
    } 

    /* 
    if ((active == null)&& !tasks.isEmpty()) { 
     scheduleNext(); 
    } else tasks.offer(r); 
    */ 
    //tasks.offer(r); 

    //System.out.println("Queue Size=" + tasks.size()); 

} 

private void serialize(Thread th) { 
    try { 
     Thread activeThread = new Thread(active); 

     th.wait(200); 
     threadExecutor.submit(th); 
    } catch (InterruptedException iEx) { 

    } 
    /* 
    active=tasks.poll(); 
    System.out.println("active thread is " + active.toString()); 
    threadExecutor.execute(active); 
    */ 
} 

private void parallalize() { 
    if(null!=active) 
     threadExecutor.submit(active); 
} 

protected void scheduleNext(Thread r) { 
    //System.out.println("scheduleNext called") ; 
    if(false==compareKeys(r,new Thread(active))) 
     parallalize(); 
    else serialize(r); 
} 

private boolean compareKeys(Thread r, Thread active) { 
    // TODO: obtain names of threads. If they contain same PUID, serialize them. 
    if(null==active) 
     return true; // first thread should be serialized 
    else return false; //rest all go parallel, unless logic controlls it 
} 

}

2

的问题是有点老了,但我想我已经找到了一些简单,工作液:

扩展的ThreadPoolExecutor类像下面。新功能是保持活动任务计数(不幸的是,提供getActiveCount()是不可靠的)。如果taskCount.get() == 0并且没有更多的排队任务,则表示没有任何要执行的操作并执行程序关闭。你有你的退出标准。此外,如果您创建执行者,但未能提交任何任务,也不会封锁:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor { 

    private final AtomicInteger taskCount = new AtomicInteger(); 

    public CrawlingThreadPoolExecutor() { 
     super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 

     super.beforeExecute(t, r); 
     taskCount.incrementAndGet(); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 

     super.afterExecute(r, t); 
     taskCount.decrementAndGet(); 
     if (getQueue().isEmpty() && taskCount.get() == 0) { 
      shutdown(); 
     } 
    } 
} 

你必须做的一件事是在它不断引用的方式实现你的RunnableExecutor您正在使用以便能够提交新任务。这是一个模拟:

public class MockFetcher implements Runnable { 

    private final String url; 
    private final Executor e; 

    public MockFetcher(final Executor e, final String url) { 
     this.e = e; 
     this.url = url; 
    } 

    @Override 
    public void run() { 
     final List<String> newUrls = new ArrayList<>(); 
     // Parse doc and build url list, and then: 
     for (final String newUrl : newUrls) { 
      e.execute(new MockFetcher(this.e, newUrl)); 
     } 
    } 
}