2011-05-05 55 views
3

我想有一个可以被多个线程并发读取的迭代器,这样我就可以并行处理迭代器源数据。面临的挑战是,我无法将hasNext()与其逻辑next()结合在一起,因为这些可以转到不同的线程。 (也就是说,两个线程可以调用hasNext(),每个都是正确的,然后让第二个线程失败,因为只有一个项目。)我的问题是,对于某些源,我不知道它是否有下一个元素,直到我尝试阅读它。一个这样的例子是从文件中读取行;另一个是从Lucene索引读取Term实例。如何在某些源上创建并发迭代器?

我正在考虑在迭代器中设置一个队列,并为队列提供一个单独的线程。那样,hasNext()是根据队列大小实现的。但我不明白我怎么能保证队列被填满,因为那个线程可能会饿死。

我应该忽略迭代器合约,只需拨打next()直到NoSuchElementException被抛出?

有更好的方法来处理这个问题吗?

回答

6

您的线程是否可以从BlockingQueue而不是Iterator中拉取。正如你所发现的,迭代器不适合并发访问。

传递一个LinkedBlockingQueue,让你的线程执行queue.poll()直到没有任何东西离开。

+0

感谢您的快速响应。这是有道理的,但我需要以确保它不饿的方式填补队列,对吗?我无法将所有元素放入队列中,因为可能太多元素不适合内存。我想除了队列之外,我还必须有一个'AtomicBoolean',它表示没有更多的记录会被添加到队列中。 – 2011-05-05 05:53:37

+0

你可以让你的线程做一个queue.take(),并且有一些毒丸(http://www.javaspecialists.eu/archive/Issue016.html)来表明没有什么可做的。 – sbridges 2011-05-05 06:00:57

+0

难道这仍然不会有多个线程调用'take()'并让其中一个获得毒丸,而另一个线程挂在一个空队列上?这意味着'take()'必须在主线程中完成,然后将值分派给工作线程。或者我错过了什么? – 2011-05-05 06:32:27

1

我想到了一个解决方法/转义,以保留(大部分)合同并避免NoSuchElementExceptionsiterator.next()可能会返回一个自定义的“迭代结束”标记对象,该对象可以被处理但不过是一个假。因此,如果一个线程收到truehasNext(),但另一个线程已经抓住最后一个项目,那么第一个线程将得到一个虚拟(而不是一个例外)。

你应该能够在所有正常使用情况下使用这种迭代器,单线程使用甚至应该注意到这种差异。应该可以使用增强型for循环。

只有当您尝试等待NoSuchElementException而不是检查hasNext()时,它才会失败,因为该异常不会因为虚拟项目而被抛出。

+0

是的,我用这个队列。在这种情况下,我所做的是在虚拟机出现时,工作线程忽略该工作,但将虚拟机放回队列中。这样所有的工人最终都能得到它。 – 2011-05-07 07:11:44

-1

我可能会错过这一点,但是不能在这种情况下使用同步块吗?

synchronized(iterator) 
{ 
    if (iterator.hasNext()) element = iterator.next(); 
} 

在这里,当一个线程正在使用迭代器时,其他线程将无法访问它。

+0

在某些情况下,iterator.next()方法会做很多工作,这很好并行化。 – 2011-05-05 06:38:20