2010-01-27 78 views
7

有人知道开源的BufferedIterator,其中后面的N个元素是在后台线程上热切地获取的吗? 这是an implementationTechRepublic article,但我认为它还没有经过彻底测试。BufferedIterator的实现

Iterators .buffer(Iterator toBuffer,int bufferSize)对Guava来说是一个很好的补充,有没有考虑过?

+0

这可能是一个合理的番石榴特征请求。 http://code.google.com/p/guava-libraries/issues/entry – 2010-01-28 16:43:52

+0

完成:http://code.google.com/p/guava-libraries/issues/detail?id=318 – 2010-01-28 18:54:48

回答

4

出现链接的实施已经写的Java 4,可以使用番石榴和java.util.concurrent被简化了一点:

import java.util.Iterator; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.atomic.AtomicReference; 

import com.google.common.base.Throwables; 
import com.google.common.collect.AbstractIterator; 
import com.google.common.util.concurrent.Executors; 

public abstract class Iterators2 { 
    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity) { 
     return buffer(source, capacity, defaultExecutor); 
    } 

    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity, 
             final ExecutorService exec) { 
     if (capacity <= 0) return source; 
     final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity); 

     // Temporary storage for an element we fetched but could not fit in the queue 
     final AtomicReference<E> overflow = new AtomicReference<E>(); 
     final Runnable inserter = new Runnable() { 
      @SuppressWarnings("unchecked") 
      public void run() { 
       E next = (E) END_MARKER; 
       if (source.hasNext()) { 
        next = source.next(); 
        // ArrayBlockingQueue does not allow nulls 
        if (next == null) next = (E) NULL_MARKER; 
       } 
       if (queue.offer(next)) { 
        // Keep buffering elements as long as we can 
        if (next != END_MARKER) exec.submit(this); 
       } else { 
        // Save the element. This also signals to the 
        // iterator that the inserter thread is blocked. 
        overflow.lazySet(next); 
       } 
      } 
     }; 
     // Fetch the first element. 
     // The inserter will resubmit itself as necessary to fetch more elements. 
     exec.submit(inserter); 
     Iterator<E> iterator = new AbstractIterator<E>() { 
      protected E computeNext() { 
       try { 
        E next = queue.take(); 
        E overflowElem = overflow.getAndSet(null); 
        if (overflowElem != null) { 
         // There is now a space in the queue 
         queue.put(overflowElem); 
         // Awaken the inserter thread 
         exec.submit(inserter); 
        } 
        if (next == END_MARKER) { 
         return endOfData(); 
        } else if (next == NULL_MARKER) { 
         return null; 
        } else { 
         return next; 
        } 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        return endOfData(); 
       } 
      } 
     }; 

     return iterator; 
    } 

    protected Iterators2() { 
     throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated")); 
    } 

    private static ExecutorService defaultExecutor = 
     java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory()); 

    private static final Object END_MARKER = new Object(); 

    private static final Object NULL_MARKER = new Object(); 
} 

注:以上执行不尝试处理源异常迭代器(如果抛出一个,插入器任务将突然终止,导致调用线程死锁。)

+0

为什么你会接受一个ExecutorService,你什么时候想要一个守护线程工厂以外的东西,就像你默认做的那样? – 2010-01-28 01:49:04

+2

您可能想要从固定大小的池中分配线程。或者您可能想要修改默认优先级。或者您可能想要跟踪所有线程,以防在数据库连接断开时需要杀死它们。使用现有接口('ExecutorService'和'ThreadFactory')更简单,而不是为所有不同的选项添加一堆重载。 – finnw 2010-01-28 02:14:01

+1

请注意,此实现不会适当地处理来自源迭代器的异常。 – 2010-02-01 19:19:20