2017-04-04 24 views
3

Java 8集合提供了以流形式获取集合的功能。然而,一旦我们调用stream()方法,我们就可以获得当前集合的内容。 如果我的收藏在流处理过程中增长会怎样?流上的操作可能会使用更多数据更新集合。有没有简单的&有效的方式来处理这种情况?如何处理动态集合作为流?

(我试过Stream.concat()从流处理操作中,但我得到异常:在线程异常“主要” java.lang.IllegalStateException:流已在运行或关闭)

隔空具体的例子,假设我有一个URL的并发队列。

Queue<Url> concurrentUrlQue= initUrlQueue(); 

现在我想获得此URL队列的流,并处理的URL一个接一个。该过程包括从队列中删除url,读取url指向的网页,从页面提取URL并将这些URL添加到并发队列中。

concurrentUrlQue.stream().forEach((url)->readAndExtractUrls(url, concurrentUrlQue)); 

我希望能够以流的形式处理上述动态增长的队列。 (另外,我希望能够使用并行流处理这个动态队列)

有没有一种简单的方法来实现这个使用Java流?

+0

,您应该咨询[javadoc的的'java.util.Spliterator'(http://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)。这很漫长,但是可以解决你的问题。 – glee8e

+0

我会通过它,但你能暗示我spliterator如何解决我试图解决的问题? – RajatJ

+0

所以你有一个URL可能会产生更多的URL,可能会产生更多的URL,并且你希望它们都在同一个集合中?这听起来像是你想要某种可以并行运行的递归。 Spliterator听起来很有趣。 –

回答

4

您需要编写分隔符来阻止等待新元素。

class QueueSpliterator<T> extends Spliterators.AbstractSpliterator<T> { 

    private final BlockingQueue<T> queue; 

    public QueueSpliterator(BlockingQueue<T> queue) { 
    super(Long.MAX_VALUE, 0); 
    this.queue = queue; 
    } 

    public boolean tryAdvance(Consumer<? super T> action) { 
    try { 
     T element = queue.take(); 
     action.accept(element); 
     return true; 
    } catch (InterruptedException e) { 
     return false; 
    } 
    } 
} 

然后,您使用该分割器创建一个流,并像普通的无限流一样处理它。

public class Main { 
    public static void main(String... args) { 
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000); 

    new Thread(() -> { 
     for (int i = 0; i < 1000; ++i) { 
     try { 
      queue.put(i); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
     } 
    }).start(); 


    Spliterator<Integer> queueSpliterator = new QueueSpliterator<>(queue); 
    Stream<Integer> stream = StreamSupport.stream(queueSpliterator, false); 

    stream.forEach(System.out::println); 
    } 
} 
+0

This Works,thanks。此外,这是一个伟大的学习练习给我.. – RajatJ

+0

令人惊叹的答案谢谢你。 – jophde