2015-09-07 116 views
32

如何在Java 8 Stream上实现“分区”操作?通过分区我的意思是,将一个流分成给定大小的子流。不知何故,它将与Guava Iterators.partition()方法相同,只是希望分区是懒惰评估的Streams而不是List的。对Java 8 Stream进行分区

+6

制作懒洋洋地评估分区是在我的经验一般是不可行 - 你会想到,如果你保持几个分区引用发生,然后按顺序访问它们? –

+3

@JonSkeet - 特别是如果它们是平行的。 – OldCurmudgeon

+0

谢谢你的观点,乔恩,我怀疑这一点。你认为下面我自己的答案中的非懒惰实现是最优的吗? – Trader001

回答

25

这是不可能的任意源流到固定大小的批次分区,因为这将螺钉了并行处理。并行处理时,您可能不知道分割后的第一个子任务中有多少个元素,因此在完成第一个子任务的处理之前,您无法为下一个子任务创建分区。

但是,可以从随机访问List创建分区流。这样的功能是可用的,例如,在我StreamEx库:

List<Type> input = Arrays.asList(...); 

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize); 

或者,如果你真的想流的流:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream); 

如果你不想依赖于第三方库,可以实现手动等ofSubLists方法:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) { 
    if (length <= 0) 
     throw new IllegalArgumentException("length = " + length); 
    int size = source.size(); 
    if (size <= 0) 
     return Stream.empty(); 
    int fullChunks = (size - 1)/length; 
    return IntStream.range(0, fullChunks + 1).mapToObj(
     n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); 
} 

此实现看起来有点长,但考虑到像克罗一些极端情况se-to-MAX_VALUE列表大小。


如果你想为无序流并行友好的解决方案(这样你就不会在意这些流元素将在单批合并),你可以使用收集这样的(感谢@sibnick灵感) :

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
        Collector<List<T>, A, R> downstream) { 
    class Acc { 
     List<T> cur = new ArrayList<>(); 
     A acc = downstream.supplier().get(); 
    } 
    BiConsumer<Acc, T> accumulator = (acc, t) -> { 
     acc.cur.add(t); 
     if(acc.cur.size() == batchSize) { 
      downstream.accumulator().accept(acc.acc, acc.cur); 
      acc.cur = new ArrayList<>(); 
     } 
    }; 
    return Collector.of(Acc::new, accumulator, 
      (acc1, acc2) -> { 
       acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc); 
       for(T t : acc2.cur) accumulator.accept(acc1, t); 
       return acc1; 
      }, acc -> { 
       if(!acc.cur.isEmpty()) 
        downstream.accumulator().accept(acc.acc, acc.cur); 
       return downstream.finisher().apply(acc.acc); 
      }, Collector.Characteristics.UNORDERED); 
} 

用例:

List<List<Integer>> list = IntStream.range(0,20) 
            .boxed().parallel() 
            .collect(unorderedBatches(3, Collectors.toList())); 

结果:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]] 

这样的收集器完全是线程安全的,并为顺序流生成有序批次。

如果要应用中间转化为每一批,你可以使用以下版本:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
     Collector<T, AA, B> batchCollector, 
     Collector<B, A, R> downstream) { 
    return unorderedBatches(batchSize, 
      Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); 
} 

例如,这种方法可以总结的数字在每一批对飞:

List<Integer> list = IntStream.range(0,20) 
     .boxed().parallel() 
     .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
      Collectors.toList())); 
+0

我会非常感兴趣地看到像StreamEx中添加的并行解决方案(它已成为Guava和Lombok的主要项目)。更少是因为我关心并列主义,而是因为它适用于流 - StreamEx.ofSubLists要求您已经有一个折叠列表,而我的用例通常是正在进行的流,我不想折叠到一个集合中并在内存中一次全部。 – Torque

3

看起来像Jon Skeet在他的comment中显示的那样,不可能让分区懒惰。对于非延迟分区,我已经有这样的代码:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) { 
    final Iterator<T> it = source.iterator(); 
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream); 
    final Iterable<Stream<T>> iterable =() -> partIt; 

    return StreamSupport.stream(iterable.spliterator(), false); 
} 
+10

我知道这是一个老话题,但它认为值得一提 - 它不是纯粹的Java 8:'Iterators'类来自Guava。 –

0

我认为这是可能有某种形式的黑客内:

创建实用工具类批次:

public static class ConcurrentBatch { 
    private AtomicLong id = new AtomicLong(); 
    private int batchSize; 

    public ConcurrentBatch(int batchSize) { 
     this.batchSize = batchSize; 
    } 

    public long next() { 
     return (id.getAndIncrement())/batchSize; 
    } 

    public int getBatchSize() { 
     return batchSize; 
    } 
} 

和方法:

public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){ 
    ConcurrentBatch batch = new ConcurrentBatch(batchSize); 
    //hack java map: extends and override computeIfAbsent 
    Supplier<ConcurrentMap<Long, List<T>>> mapFactory =() -> new ConcurrentHashMap<Long, List<T>>() { 
     @Override 
     public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) { 
      List<T> rs = super.computeIfAbsent(key, mappingFunction); 
      //apply batchFunc to old lists, when new batch list is created 
      if(rs.isEmpty()){ 
       for(Entry<Long, List<T>> e : entrySet()) { 
        List<T> batchList = e.getValue(); 
        //todo: need to improve 
        synchronized (batchList) { 
         if (batchList.size() == batch.getBatchSize()){ 
          batchFunc.accept(batchList); 
          remove(e.getKey()); 
          batchList.clear(); 
         } 
        } 
       } 
      } 
      return rs; 
     } 
    }; 
    stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s)) 
      .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList()))) 
      .entrySet() 
      .stream() 
      //map contains only unprocessed lists (size<batchSize) 
      .forEach(e -> batchFunc.accept(e.getValue())); 
} 
+0

你对没有证件的事实是正确的,这就是我称之为'黑客'的原因。你也是对的非原子'computeIfAbsent'。我将很快编辑代码。但为什么它不是懒惰?在处理一个批次之前,不会分配所有列表。并行批处理也是不常用的。 – sibnick

+1

对于并行流,它根本不起作用。 'applyConcurrentBatchToStream(System.out :: println,IntStream.range(0,100).boxed()。parallel(),3)'打印垃圾(随机收集的组,一些重复的元素,甚至不同组之间的运行次数)。对于只有顺序的流,有更简单和更高效的解决方案(如OP提供的解决方案)。 –

+0

但是你也显示了bug的来源:非原子'computeIfAbsent'。 – sibnick

6

只要你想使用流顺序,就可以划分流(以及进行相关的功能,如窗口 - 我认为这是你真正想要的在这种情况下)。将支持partitoning的标准流 两个库cyclops-react(我的作者)和jOOλ其中独眼巨人反应的扩展(添加功能,如窗)。

cyclops-streams有一组用于在Java Streams上操作的静态函数StreamUtils以及一系列函数,如splitAt,headAndTail,splitBy,用于分区的分区。

要窗口流引入嵌套流的数据流大小30的可以使用窗口方法。

对于OPs点,在Streaming术语中,将Stream拆分为给定大小的多个Stream是Windowing操作(而不是分区操作)。

Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30); 

有一个名为ReactiveSeq延伸jool.Seq,并增加了窗口化功能流扩展类,可以使代码干净了一点。

ReactiveSeq<Integer> seq; 
    ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30); 

正如Tagir指出的那样,尽管如此,这并不适合并行Streams。如果你想窗口或批处理中的流要在多线程的方式执行。 LazyFutureStream在cyclops-react可能是有用的(窗是待办事项清单上,但普通的老式配料现已上市)。

在这种情况下,数据将从执行Stream的多个线程传递到多生产者/单消费者等待空闲队列,并且来自该队列的顺序数据可以在再次分配给线程之前进行窗口化。

Stream<List<Data>> batched = new LazyReact().range(0,1000) 
               .grouped(30) 
               .map(this::process); 
0

下面是AbacusUtil

IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray())); 

宣言快速的解决方案:我AbacusUtil的开发商。

0

对于这个问题,我发现的最优雅的纯Java 8解决方案:

public static <T> List<List<T>> partition(final List<T> list, int batchSize) { 
return IntStream.range(0, getNumberOfPartitions(list, batchSize)) 
       .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size()))) 
       .collect(toList()); 
} 

//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java 
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) { 
    return (list.size() + batchSize- 1)/batchSize; 
}