2012-02-24 132 views
0

我试图找到一种方法,如何在多个线程中加载多个线程并具有一定的最大线程限制,以某种方式完成后加载新页面。在下载页面之后,还应该为加载的内容创建另一个后期处理线程,以便整个过程链接在一起。Java任务队列,线程池和具有回调的线程通知新任务何时可以启动

我怎么想使它:

  • 任务队列认为,应下载
  • 线程池有一定的数量的线程下载任务队列的页面(加载网页采取的网页一段时间,所以 纱的数量可以比
  • 当完成一个页面的下载,线程应告知这一点,以便从队列中一个新的任务可以代替开始
  • CPU内核的数量)要高得多

    当一个页面的下载完成后,内容应该被转移到另一个任务队列中进行后期处理

  • 另一个线程池的线程数与CPU的线程数相同(猜测最快的是每个线程只有一个线程对于后期处理),此线程池将在下载的页面上执行后期处理。

  • 当完成一个页面的处理后,线程应该通知它,以便在队列中的另一个页面可以进行后处理

  • 当所有的网页已经被下载(队列为空),则第一个线程池可以被关闭,其他的线程池可以当两个任务队列为空(所有页面已被下载和后处理)

我有类似关机:

  for (int j = 0; j < threads.length; j++) { 
      threads[j].start(); 
     } 

     for (int j = 0; j < threads.length; j++) { 
      threads[j].join(); 
     } 

但这样所有页面加载是在同一时间在单独的线程,我想限制线程的数量。更重要的是,我想重用这些线程,并让一个线程完成下一个任务。我可以用while循环做到这一点,但这是我想要避免的,我不想用while循环来检查队列是否有更多的任务,以及线程是否空闲。是否有可能使用某种回调,以便线程通知已完成的池并返回数据。 我还希望下载任务将内容存储在数据结构中,并将其添加到后处理任务队列中。

我迄今发现的最好的资源是: Thread pools Callback

但我不知道如果它甚至可以创建它就是我想要的。我'坚持思考函数指针。

回答

3

不要使用低级线程方法来做到这一点。有一个downloadExecutor线程池,并将DownloadTask实例(实施RunnableCallable)提交到此池。

在DownloadTask代码的末尾,将PostProcessPageTask实例(再次实现RunnableCallable)提交到第二个postProcessExecutor线程池。

您可以使用一个或两个CountDownLatch实例,每个任务在完成时递减,并让主线程等待此(或这些)锁存器以知道何时线程池必须关闭。

有关更多信息,请参阅http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html和docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html。

+1

使用两个池有什么意义吗?为什么不在一个池中下载和处理一个任务课程。我认为将下载的数据排队到使用相同处理器内核的另一个池中看不到任何优势。 – 2012-02-24 12:33:40

+0

我会研究这个JB,并在稍后回复,我认为让DownloadTask向第二个执行程序提交PostProcessPageTask将完成我正在尝试完成的任务。 – user979899 2012-02-24 12:57:30

+0

@Martin: 我甚至可以连续做所有事情,而不是平行行事,但我的目的是尽量让它变得更快。从页面下载内容主要取决于传输速度(只要cpu上没有太多的其他负载),因此我可以同时下载多个页面,但后处理内容仅取决于CPU负载。因此,如果假设我一次从100个页面下载内容,则我也将有大约100个线程进行后期处理,但这样做效率不高,因为更少的线程以更快的速度完成工作会更快。 – user979899 2012-02-24 12:57:36

1

您可以使用番石榴的ListenableFutures

首先,您需要提交下载任务到ListenableExecutorService,然后通过Futures.transform将后期处理器转换为后期处理器。

ListenableExecutorService dlPool = MoreExecutors.listeningDecorator(firstPool); 
ListenableExecutorService procPool = MoreExecutors.listeningDecorator(secondPool); 

List<ListenableFuture<Result>> results = new ArrayList<...>(); 
for (String url : urls) { 
    // download task 
    ListenableFuture<String> html = dlPool.submit(...); 
    // post process 
    ListenableFuture<Result> result = Futures.transform(html, 
    new Function<String, Result>() { 
     ... // post process 
    }, procPool); 
    results.add(result); 
} 

// blocks until all results are processed 
List<Result> processed = Futures.allAsList(results).get(); 

firstPool.shutdownNow(); 
secondPool.shutdownNow(); 
-1

不要试图手工编写这种类型的通用基础设施。

Java 5中附带可爱的java.util.concurrent包

它应该是你把做多线程应用程序时的第一件事。

它有很多像线程池(它执行Runnable或Callable对象)的一般工具,并会为你做很多狗工作。

因特网上有很多关于它的大量免费资源,或者如果您更喜欢书籍,Brian Goetz的“实践中的Java并发”被广泛认为是最好的之一。