2011-01-06 61 views
4

我目前有一个并发队列实现,它使用BlockingQueue作为数据存储。我现在需要引入具有更高优先级的第二种类型的对象,使我朝向原始队列的饥饿/优先级队列。所以我们正在处理由多个线程产生的类型A和类型B的对象。 B类型的任何对象都应在A类型的对象之前进行处理,但必须保持FIFO顺序以外的其他对象。所以如果插入{1A,1B,2A,3A,2B},顺序应该是{1B,2B,1A,2A,3A}协调多个并发队列

我试过一个单一的PriorityBlockingQueue将B型推到前面,没有保持先入先出的要求(相同类型的项目之间没有自然顺序)。

我的下一个想法是使用两个并发队列。在协调两个队列之间的访问时,我正在寻找共同的问题或注意事项。理想情况下,我愿意做这样的事情:

public void add(A a) 
    { 
     aQueue.add(a); 
    } 
    public void add(B b) 
    { 
     bQueue.add(b); 
    } 

    private void consume() 
    { 
     if(!bQueue.isEmpty()) 
      process(bQueue.poll()); 
     else if(!aQueue.isEmpty()) 
      process(aQueue.poll()); 
    } 

我是否需要任何同步或锁定,如果这两个队列ConcurrentLinkedQueue(或在这里插入比较合适的结构)?注意我有很多制作者,但只有一个消费者(单线程ThreadPoolExecutor)。

编辑:如果一个B在isEmpty()检查之后进来,那么可以处理一个A并在下一个consume()调用中处理它。

+3

在A对象之前处理B对象有多重要?在你当前的代码中,如果在调用isEmpty()之后插入B对象,你会错过它。有两种不同的答案,这取决于你必须做什么...... – 2011-01-06 16:55:19

+0

@Jonathan不重要。我的意思是补充说,不知怎么就把它排除在外了。问题已更新。 – 2011-01-06 17:01:30

回答

3

我不确定我是否确实了解您的情况,但我认为应该可以使用单个队列解决此问题。

你说你的对象(在队列中)应该通过自然顺序和类型进行比较。如果没有自然顺序,只需要一个序列生成器(即AtomicLong),它将为您的对象提供唯一的,始终递增的队列ID。从AtomicLong获取数据应该不会花费时间,除非您处于纳秒级的世界。

所以你Comparator.compare应该是这样的:

1)检查的对象类型。如果不同(A VS B),则返回1/-1。否则,见下面
2)检查ID。它保证是不同的。

如果您无法更改对象(A和B),仍然可以将它们包装到另一个包含该ID的对象中。

+0

我唯一关心的是如何包装和使用增量,是在这里推送的绝对数量的对象。 – 2011-01-06 17:07:17

1

您需要同步getQueueItem()方法。

用A和B实现QueueItem接口。

public void add(A a) 
    { 
     aQueue.add(a); 
    } 
    public void add(B b) 
    { 
     bQueue.add(b); 
    } 

    private void consume() 
    { 
     process(getNextItem()); 
    } 

    private QueueItem getNextItem() 
    { 
     synchronized(bQueue) { 
      if(!bQueue.isEmpty()) return bQueue.poll(); 
      return aQueue.poll(); 
     } 
    } 
+0

感谢您的回答。我将尝试@mindas建议的单个队列,但我最终可能会回到两个队列。 – 2011-01-06 18:01:14

3

根据您的要求,您的方法应该可以正常工作。

我会对您的代码进行以下更改。这会阻塞你的getNextItem(),直到其中一个队列为你返回一个对象。

private Object block = new Object(); 

public void add(A a) 
{ 
    synchronized(block) 
    { 
     aQueue.add(a); 
     block.notifyAll(); 
    } 
} 

public void add(B b) 
{ 
    synchronized(block) 
    { 
     bQueue.add(b); 
     block.notifyAll(); 
    } 
} 

private Object consume() 
{ 
    Object value = null 
    synchroinzed(block) 
    { 
     while (return == null) 
     { 
      value = bQueue.poll(); 
      if (value == null) value = aQueue.poll(); 
      if (value == null) block.wait(); 
     } 
    } 

    return value; 
}