如何在Java 8 Stream上实现“分区”操作?通过分区我的意思是,将一个流分成给定大小的子流。不知何故,它将与Guava Iterators.partition()方法相同,只是希望分区是懒惰评估的Streams而不是List的。对Java 8 Stream进行分区
回答
这是不可能的任意源流到固定大小的批次分区,因为这将螺钉了并行处理。并行处理时,您可能不知道分割后的第一个子任务中有多少个元素,因此在完成第一个子任务的处理之前,您无法为下一个子任务创建分区。
但是,可以从随机访问List
创建分区流。这样的功能是可用的,例如,在我StreamEx
库:
List<Type> input = Arrays.asList(...);
Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
或者,如果你真的想流的流:
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
如果你不想依赖于第三方库,可以实现手动等ofSubLists
方法:
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1)/length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
此实现看起来有点长,但考虑到像克罗一些极端情况se-to-MAX_VALUE列表大小。
如果你想为无序流并行友好的解决方案(这样你就不会在意这些流元素将在单批合并),你可以使用收集这样的(感谢@sibnick灵感) :
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}
用例:
List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));
结果:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
这样的收集器完全是线程安全的,并为顺序流生成有序批次。
如果要应用中间转化为每一批,你可以使用以下版本:
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
例如,这种方法可以总结的数字在每一批对飞:
List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));
我会非常感兴趣地看到像StreamEx中添加的并行解决方案(它已成为Guava和Lombok的主要项目)。更少是因为我关心并列主义,而是因为它适用于流 - StreamEx.ofSubLists要求您已经有一个折叠列表,而我的用例通常是正在进行的流,我不想折叠到一个集合中并在内存中一次全部。 – Torque
看起来像Jon Skeet在他的comment中显示的那样,不可能让分区懒惰。对于非延迟分区,我已经有这样的代码:
public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable =() -> partIt;
return StreamSupport.stream(iterable.spliterator(), false);
}
我知道这是一个老话题,但它认为值得一提 - 它不是纯粹的Java 8:'Iterators'类来自Guava。 –
我认为这是可能有某种形式的黑客内:
创建实用工具类批次:
public static class ConcurrentBatch {
private AtomicLong id = new AtomicLong();
private int batchSize;
public ConcurrentBatch(int batchSize) {
this.batchSize = batchSize;
}
public long next() {
return (id.getAndIncrement())/batchSize;
}
public int getBatchSize() {
return batchSize;
}
}
和方法:
public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
ConcurrentBatch batch = new ConcurrentBatch(batchSize);
//hack java map: extends and override computeIfAbsent
Supplier<ConcurrentMap<Long, List<T>>> mapFactory =() -> new ConcurrentHashMap<Long, List<T>>() {
@Override
public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
List<T> rs = super.computeIfAbsent(key, mappingFunction);
//apply batchFunc to old lists, when new batch list is created
if(rs.isEmpty()){
for(Entry<Long, List<T>> e : entrySet()) {
List<T> batchList = e.getValue();
//todo: need to improve
synchronized (batchList) {
if (batchList.size() == batch.getBatchSize()){
batchFunc.accept(batchList);
remove(e.getKey());
batchList.clear();
}
}
}
}
return rs;
}
};
stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
.collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
.entrySet()
.stream()
//map contains only unprocessed lists (size<batchSize)
.forEach(e -> batchFunc.accept(e.getValue()));
}
你对没有证件的事实是正确的,这就是我称之为'黑客'的原因。你也是对的非原子'computeIfAbsent'。我将很快编辑代码。但为什么它不是懒惰?在处理一个批次之前,不会分配所有列表。并行批处理也是不常用的。 – sibnick
对于并行流,它根本不起作用。 'applyConcurrentBatchToStream(System.out :: println,IntStream.range(0,100).boxed()。parallel(),3)'打印垃圾(随机收集的组,一些重复的元素,甚至不同组之间的运行次数)。对于只有顺序的流,有更简单和更高效的解决方案(如OP提供的解决方案)。 –
但是你也显示了bug的来源:非原子'computeIfAbsent'。 – sibnick
只要你想使用流顺序,就可以划分流(以及进行相关的功能,如窗口 - 我认为这是你真正想要的在这种情况下)。将支持partitoning的标准流 两个库cyclops-react(我的作者)和jOOλ其中独眼巨人反应的扩展(添加功能,如窗)。
cyclops-streams有一组用于在Java Streams上操作的静态函数StreamUtils以及一系列函数,如splitAt,headAndTail,splitBy,用于分区的分区。
要窗口流引入嵌套流的数据流大小30的可以使用窗口方法。
对于OPs点,在Streaming术语中,将Stream拆分为给定大小的多个Stream是Windowing操作(而不是分区操作)。
Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
有一个名为ReactiveSeq延伸jool.Seq,并增加了窗口化功能流扩展类,可以使代码干净了一点。
ReactiveSeq<Integer> seq;
ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
正如Tagir指出的那样,尽管如此,这并不适合并行Streams。如果你想窗口或批处理中的流要在多线程的方式执行。 LazyFutureStream在cyclops-react可能是有用的(窗是待办事项清单上,但普通的老式配料现已上市)。
在这种情况下,数据将从执行Stream的多个线程传递到多生产者/单消费者等待空闲队列,并且来自该队列的顺序数据可以在再次分配给线程之前进行窗口化。
Stream<List<Data>> batched = new LazyReact().range(0,1000)
.grouped(30)
.map(this::process);
下面是AbacusUtil
IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
宣言快速的解决方案:我AbacusUtil的开发商。
对于这个问题,我发现的最优雅的纯Java 8解决方案:
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
.mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
.collect(toList());
}
//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
return (list.size() + batchSize- 1)/batchSize;
}
- 1. Java 8 Stream,anyMatch
- 2. Java 8 stream vs List
- 3. 关闭Java 8 Stream
- 4. Java 8 Stream主要方法
- 5. 对表进行分区
- 6. Java:如何使用Stream API按条件对实体进行分组?
- 7. Java-8对集合进行排序
- 8. Amazon Kinesis Stream如何与Java Stream进行比较?
- 9. java 8 stream group by and summming double
- 10. 用StringBuilder收集Java 8 Int Stream
- 11. Java 8 Stream API收集器问题
- 12. Java 8 Stream - 编译器问题
- 13. java 8用parallelStream和stream减少
- 14. Java 8 stream min不返回期望值
- 15. 我应该对表格进行分区/子分区吗?
- 16. 如何将Stream方法分配给java 8中的函数?
- 17. 对现有表进行分区
- 18. 在Prolog中对列表进行分区
- 19. 在Racket中对列表进行分区
- 20. 在mysql中对分区进行排名(
- 21. 使用IList.Contains(分区)对多个分区进行DocumentDb查询
- 22. Java 8 Stream - Reduce函数的组合器没有执行
- 23. Java 8 Stream - 为什么filter方法不能执行?
- 24. Java-8:如何使用Map.Entry#comparisonByValue对Map(基于值)进行排序,同时忽略区分大小写?
- 25. 使用Java 8 Streaming对一个集合进行分组和排序
- 26. 使用Java 8 Streams对一个集合进行分组和排序(clean solution)
- 27. 使用Java 8进行线程排序
- 28. 如何使用java 8中的reduce对列表进行排序?
- 29. 对Java 8 LocalDateTime
- 30. 按SQL中的计数进行分区
制作懒洋洋地评估分区是在我的经验一般是不可行 - 你会想到,如果你保持几个分区引用发生,然后按顺序访问它们? –
@JonSkeet - 特别是如果它们是平行的。 – OldCurmudgeon
谢谢你的观点,乔恩,我怀疑这一点。你认为下面我自己的答案中的非懒惰实现是最优的吗? – Trader001