2017-08-24 130 views
2

即时构建一个多线程应用程序,使用WorkerThreads处理来自BlockingQueues的任务。 worker看起来像follws(作为抽象类,子类实现processItem())。WorkerThread:等待处理完成(BlockingQueue)

abstract class WorkerThread extends Thread { 
    BlockingQueue<Task> q; 
    int tasksInSystem; // globally available 

    public void run() { 
     while(!interrupted()) { 
      Task t = q.take(); 
      process(t); 
      tasksInSystem--; 
     } 
    } 

    abstract void process(Task t); 
} 

特别的是,我想等待所有任务完成。 我的第一个想法是:

  • 数每增加一个任务
  • 降低用计数器处理完成后。

但是: 但也有不同的任务,不同的工人实现和多队列。所以我将不得不维护大量不同的柜台。

我想什么有:

q.waitForEmptyAndCompleted()

这需要排队跟踪“飞行”的任务,并要求工作进程,当他们完成信号(而不是tasksInsystem---;)。

工作人员无法增加该计数器,因为他必须在将他们从队列中取出之后对其进行计数。但是另一个线程可能会在take()调用之后立即运行,以至于工作人员无法事先增加计数器。

因此,计数器增加和take()必须连在一起(atomar)。这导致我到一个专门的BlockingQueue。

我没有找到预先制定的解决方案。所以我最好的猜测是实现我自己的BlockingQueue。是否有我可以使用的东西(为了避免自己实现并测试线程安全的阻塞队列)?或者你有什么想法以不同的方式实施这种等待电话?

+2

你可以使用'ExecutorService'并调用shutdown吗? Executor服务将管理所有任务,您需要将队列与它关联。关机将阻止,直到所有任务完成。 – hgrey

+0

嘿,谢谢你的回答。我接近实际使用ExecutorService。但是当我调用shutdown()时,整个服务停止。我实际上想重用队列。 waitForEmptyAndComplete()后发生的事情将触发新的任务。所以我必须为每一轮设置一个新的ExecutorService。至少,它不会强迫我实现我自己的Queue-Variant。 – markus

回答

1

好的,因为一般ExecutorService是不够的,或许ForkJoinPool将工作,它不显式公开队列,但应该是非常容易使用给你所描述的。

关键方法是awaitQuiescence(long timeout, TimeUnit unit)它将等待,直到所有提交的任务已完成执行。