2015-04-12 56 views
8

我正在寻找一种方法来实现非终端分组操作,这样内存开销会很小。Java Streams - 有效地对已排序流上的项目进行分组

例如,考虑distinct()。在一般情况下,它别无选择,只能收集所有不同的项目,然后才将其转发。但是,如果我们知道输入流已经排序,那么可以使用最少的内存“即时”完成操作。

我知道我可以实现这个迭代器使用迭代器包装和我自己实现分组逻辑。有没有更简单的方法来实现这个使用流API而不是?

- 编辑 -

我找到了一种方法来虐待Stream.flatMap(..)来实现这一目标:

private static class DedupSeq implements IntFunction<IntStream> { 
    private Integer prev; 

    @Override 
    public IntStream apply(int value) { 
     IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value); 
     prev = value; 
     return res; 
    }  
    } 

然后:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println); 

哪打印:

1 
3 
4 
5 

随着一些更改,可以使用相同的技术进行任何种类的高效内存有效的序列分组。无论如何,我不太喜欢这个解决方案,而且我正在寻找更自然的东西(例如像绘图或筛选工作的方式)。此外,我在此打破契约,因为提供给flatMap(..)的函数是有状态的。

+2

您总是可以使用'.filter(someSet ::添加)',但你有没有尝试过,并将这种解决方案的性能与普通的'distinct()'进行比较?另外,你会说“在一般情况下”,但是可能会有一个优化的实现方式,即Stream'_is_' ORDERED',正好(或者更准确地说,是它的底层'Spliterator') – fge

+0

@fge:我不确定那里有任何优化。代码: IntStream.range(0,100000000).distinct()。forEach(x - > {}); 内存不足,尽管潜在的Spliterator报告自己是ORDERED。 –

+1

你用'.forEachOrdered()'试过了吗? – fge

回答

4

如果你想不可变的状态到不应该有它的功能添加一个解决方案,您可以诉诸collect

static void distinctForSorted(IntStream s, IntConsumer action) { 
    s.collect(()->new long[]{Long.MIN_VALUE}, 
       (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }}, 
       (a, b)->{ throw new UnsupportedOperationException(); }); 
} 

这工作,因为它是预期的方式使用可变容器时,它不能并行工作,因为在任意流位置分裂意味着可能在两个(或更多)线程中遇到值。

如果您想要一般用途IntStream而不是forEach操作,则尽管增加了复杂性,但优选Spliterator低级解决方案。

static IntStream distinctForSorted(IntStream s) { 
    Spliterator.OfInt sp=s.spliterator(); 
    return StreamSupport.intStream(
     new Spliterators.AbstractIntSpliterator(sp.estimateSize(), 
     Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) { 
     long last=Long.MIN_VALUE; 
     @Override 
     public boolean tryAdvance(IntConsumer action) { 
      long prev=last; 
      do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last); 
      return true; 
     } 
     @Override 
     public void forEachRemaining(IntConsumer action) { 
      sp.forEachRemaining(distinct(action)); 
     } 
     @Override 
     public Comparator<? super Integer> getComparator() { 
      return null; 
     } 
     private IntConsumer distinct(IntConsumer c) { 
      return i-> { 
       if(i==last) return; 
       assert i>last; 
       last=i; 
       c.accept(i); 
      }; 
     } 
    }, false); 
} 

它甚至继承了并行支持,虽然它的工作原理是在另一个线程处理它们,所以它不会加快不同操作之前预取一些值,但也许后续操作,如果有激烈的计算那些。


完成,这里是任意的,即无序,IntStream S的不依赖于“拳击加HashMap”不同的操作从而可以有一个更好的内存占用:

static IntStream distinct(IntStream s) { 
    boolean parallel=s.isParallel(); 
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream(); 
    if(parallel) s=s.parallel(); 
    return s; 
} 

它仅适用于正数int值;扩展到全32位范围将需要两个BitSet因此看起来并不简单,但通常情况下允许将存储器限制在31位范围甚至更低...

+0

谢谢。我现在看到一个自定义的Spliterator是实现它的方式(就像在Stackoverflow.com/q/283​​63323/1441122中,由** Stuart Marks **建议的那样)。顺便说一下,尽管在内存使用方面仍然是O(n),但最终的bitset解决方案是优雅的。 –

1

这样做适当将是打开流分成spliterator,然后把它包装取决于返回spliterator

  • 执行使用并发集合如果源既不是分选的幼稚的重复数据删除的属性的方式也不明确
  • 如果源代码分割器已排序,则执行优化的优化去除代码。
    支持trySplit操作将会非常棘手,因为它可能必须将子分割器提前几个步骤,直到它可以确定它没有看到一系列非独特元素的尾部。
  • 刚刚返回spliterator原样如果源已经明显

一旦你有spliterator你可以用相同的属性把它放回流,并继续做就可以了

流操作

由于我们无法修改现有的jdk接口,所以助手API必须看起来更像这样:dedup(IntStream.of(...).map(...)).collect(...)


如果你检查的java.util.stream.DistinctOps.makeRef(AbstractPipeline<?, T, ?>)来源,你会发现,JDK或多或少的确,对于基于引用的流。

这只是IntStream实现(java.util.stream.IntPipeline.distinct()),它采用了一种效率低下的方法,没有利用DISTINCTSORTED的优势。

它只是盲目地将IntStream转换为盒装的Integer流,并使用基于引用的重复数据删除而不传递适当的标志,从而使其具有内存效率。

如果jdk9中尚未解决这个问题,它可能是值得一个bug的,因为如果它们不必要地丢弃流标记,它本质上是不必要的内存消耗和浪费了流操作的优化潜力。