2011-05-18 59 views
1

在我重新开始之前,普通Java中是否有类似主题的并发队列?我有以下要求:普通Java中的类似主题的并发队列

  • 多读者/消费者
  • 多作家/制片
  • 每个消息必须由每一个(活动)消费者消费
  • 后每一位消费者读取消息时,它应该成为垃圾(即,没有更多的引用)
  • 写入到队列中不应该是O(N)给消费者的
  • 并行,优选非阻塞
  • 数0
  • 不是基于JMS:它适用于更轻/可嵌入的环境

这几乎是我所需要的一切。任何指针?

+0

我不知道你是什么意思关于“纯Java”,但EventBus(http://eventbus.org/)可能是检查出来的东西。 – I82Much 2011-05-18 22:14:14

+0

谢谢,我检查了EventBus。这是一个不错的项目,但并不完全满足我的需求。 – 2011-05-29 00:42:54

回答

3

基本上你所谈论的复用,并没有标准库中没有东西,但创建一个非常简单。假设你的客户不感兴趣的发布的消息,他们订阅那么之前你需要队列的池为每个消费者和出版物只是提供该项目给每个队列:

public class Multiplexer<M> { 
    private final List<BlockingQueue<M>> consumers 
    = new CopyOnWriteArrayList<BlockingQueue<M>>(); 

    public void publish(M msg) { 
    for (BlockingQueue<M> q : consumers) { 
     q.offer(msg); 
    } 
    } 

    public void addConsumer(BlockingQueue<M> consumer) { 
    consumers.add(consumer); 
    } 
} 

该版本允许用户使用任何阻塞队列他们可能想要的实现。如果你愿意,你显然可以为客户提供一个标准的实现和一个不错的界面。

+0

这是有效的,假设您确实阻止了“offer”调用的消费者。对于任何“倾听者”模式来说,这是相同的建议 - 听众应该尽可能快地执行他们的行为,这可能意味着另一个线程应该完成工作。如果这是一种常见的情况,那么多路复用器应该为每个消费者(或某种线程池)创建一个线程来管理消息的发送。 – AngerClown 2011-05-18 23:48:34

+0

谢谢,但这种解决方案增加了与消费者线性发布的时间。我编辑了这个帖子来说明这一点。 – 2011-05-18 23:49:00

+0

@AngerClown阻止“offer(e)”的BlockingQueue实现从根本上被破坏。对于监听者动作和线程,消费者在他们想要的时候轮询/从队列中取出。 – 2011-05-19 01:05:17

0

第三状态不是纯Java的,但你可以使用一个nonblocking linked queue一个单独的头为每个消费者(你可以依靠GC收集未引用节点)

+0

非常有趣的论文。我将执行它以与我的解决方案进行比较。 – 2011-05-29 00:38:11

0

最简单的策略是将消息传递给每个消费者,我不会有那么多的消费者,消费者的数量是重要的。您可以在几秒钟内向几十个消费者添加消息。

避免这种情况的一种方法是在许多阅读器中设置一个圆形环形缓冲区。这实施起来很棘手,意味着消费者的消息来源数量会受到限制。

0

只有一个伪消费者,让真正的消费者注册伪消费者。当生产者发送消息时,伪消费者醒来并消费该消息。消费该消息时,伪消费者为每个向其注册的真实消费者创建单独的Runnable,并在线程池中执行它们。