Java 8中的默认“paralellStream()”使用常见的ForkJoinPool
,这可能是延迟问题,如果在提交任务时公用池线程耗尽。但是,在许多情况下,CPU功率足够,任务时间足够短,所以这不成问题。如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但是对于这个问题,我们假设这不是问题。在Java8 parallelStream()中使用I/O + ManagedBlocker有什么问题吗?
但是,填充ForkJoinPool
的I/O任务实际上并没有执行任何CPU绑定工作,这是一种引入瓶颈的方法,即使有足够的CPU功率可用。 I understood that。不过那就是我们的ManagedBlocker
。因此,如果我们有I/O任务,我们应该只允许ForkJoinPool
在ManagedBlocker
内管理这个任务。这听起来非常简单。但令我惊讶的是,使用ManagedBlocker
是一件相当复杂的API,因为它很简单。毕竟,我认为这是一个普遍的问题。所以,我刚刚建立了一个简单实用的方法,使ManagedBlocker
很容易为普通情况下使用:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
现在,如果我想下载了几个使用并联网站的HTML代码,我可以这样给它不在I/O造成任何麻烦:
public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
我感到有点惊讶的是有像BlockingTasks
以上的Java运(?或者我没找到它)没有阶级,但它并不难建立。
当我谷歌的“Java的8位并行数据流:”我在第一时间拿到四个结果那些声称,由于I/O问题叉/加入吸在Java中的文章:
- https://dzone.com/articles/think-twice-using-java-8
- http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/(至少提到
ManagedBlocker
也说“在不同使用情况下,你可以给它一个ManagedBlocker实例”。它没有提到为什么在这种情况下。
我已经改变了我的搜索条件,虽然有很多人抱怨生活有多可怕,但我没有发现任何人谈论上述解决方案。由于我不喜欢马文(大脑像行星)和Java 8可用一段时间,我怀疑我在那里提出的东西有一些可怕的错误。
我撞在一起的小测试:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
我跑了一个得到以下结果:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
所以,我的8个CPU的计算机上ForkJoinPool
自然会选择8个线程,完成了第一次8个任务,最后是最后两个任务,这意味着这需要20秒,如果有其他任务排队,池可能仍然没有使用闲置的CPU(除了最近10秒内的6个内核)。
然后我用...
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
...而不是...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
...,得到了以下结果:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
在我看来是这样工作的,额外的线程被启动,以弥补我的模拟“阻塞I/O行动”(睡眠)。时间减少到10秒,我想如果我排队更多的任务,那些仍然可以使用可用的CPU功率。
如果I/O操作被封装在ManagedBlock
中,此解决方案或通常在流中使用I/O是否有任何问题?
分解的好处。这当然会限制单个任务可能花费的时间,但是如果队列中有足够的其他计算任务,它将不会限制总吞吐量。结论:如果只有吞吐量是个问题,那么解决方案很好。如果单个I/O任务的响应时间是一个问题,并且所涉及的单个I/O任务可以在更多步骤中分解,则应考虑其他解决方案。 – yankee
不要忘记:Stream API使用Fork/Join是一个实现细节。只要Streams不保证使用该框架,就不能保证使用'ManagedBlocker'将会提高并发性...... – Holger