2016-05-29 460 views
10

Java 8中的默认“paralellStream()”使用常见的ForkJoinPool,这可能是延迟问题,如果在提交任务时公用池线程耗尽。但是,在许多情况下,CPU功率足够,任务时间足够短,所以这不成问题。如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但是对于这个问题,我们假设这不是问题。在Java8 parallelStream()中使用I/O + ManagedBlocker有什么问题吗?

但是,填充ForkJoinPool的I/O任务实际上并没有执行任何CPU绑定工作,这是一种引入瓶颈的方法,即使有足够的CPU功率可用。 I understood that。不过那就是我们的ManagedBlocker。因此,如果我们有I/O任务,我们应该只允许ForkJoinPoolManagedBlocker内管理这个任务。这听起来非常简单。但令我惊讶的是,使用ManagedBlocker是一件相当复杂的API,因为它很简单。毕竟,我认为这是一个普遍的问题。所以,我刚刚建立了一个简单实用的方法,使ManagedBlocker很容易为普通情况下使用:

public class BlockingTasks { 

    public static<T> T callInManagedBlock(final Supplier<T> supplier) { 
     final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier); 
     try { 
      ForkJoinPool.managedBlock(managedBlock); 
     } catch (InterruptedException e) { 
      throw new Error(e); 
     } 
     return managedBlock.getResult(); 
    } 

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker { 
     private final Supplier<T> supplier; 
     private T result; 
     private boolean done = false; 

     private SupplierManagedBlock(final Supplier<T> supplier) { 
      this.supplier = supplier; 
     } 

     @Override 
     public boolean block() { 
      result = supplier.get(); 
      done = true; 
      return true; 
     } 

     @Override 
     public boolean isReleasable() { 
      return done; 
     } 

     public T getResult() { 
      return result; 
     } 
    } 
} 

现在,如果我想下载了几个使用并联网站的HTML代码,我可以这样给它不在I/O造成任何麻烦:

public static void main(String[] args) { 
    final List<String> pagesHtml = Stream 
     .of("https://google.com", "https://stackoverflow.com", "...") 
     .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url))) 
     .collect(Collectors.toList()); 
} 

我感到有点惊讶的是有像BlockingTasks以上的Java运(?或者我没找到它)没有阶级,但它并不难建立。

当我谷歌的“Java的8位并行数据流:”我在第一时间拿到四个结果那些声称,由于I/O问题叉/加入吸在Java中的文章:

我已经改变了我的搜索条件,虽然有很多人抱怨生活有多可怕,但我没有发现任何人谈论上述解决方案。由于我不喜欢马文(大脑像行星)和Java 8可用一段时间,我怀疑我在那里提出的东西有一些可怕的错误。

我撞在一起的小测试:

public static void main(String[] args) { 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start"); 
    IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End"); 
} 

public static void sleep() { 
    try { 
     System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName()); 
     Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     throw new Error(e); 
    } 
} 

我跑了一个得到以下结果:

18:41:29.021: Start 
18:41:29.033: Sleeping main 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7 
18:41:39.034: Sleeping main 
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:49.035: End 

所以,我的8个CPU的计算机上ForkJoinPool自然会选择8个线程,完成了第一次8个任务,最后是最后两个任务,这意味着这需要20秒,如果有其他任务排队,池可能仍然没有使用闲置的CPU(除了最近10秒内的6个内核)。

然后我用...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; })); 

...而不是...

IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 

...,得到了以下结果:

18:44:10.93: Start 
18:44:10.945: Sleeping main 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3 
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11 
18:44:20.957: End 

在我看来是这样工作的,额外的线程被启动,以弥补我的模拟“阻塞I/O行动”(睡眠)。时间减少到10秒,我想如果我排队更多的任务,那些仍然可以使用可用的CPU功率。

如果I/O操作被封装在ManagedBlock中,此解决方案或通常在流中使用I/O是否有任何问题?

回答

6

总之,是的,您的解决方案存在一些问题。它明显改善了在并行流中使用阻塞代码的能力,并且一些第三方库提供了类似的解决方案(例如,请参见jOOλ库中的Blocking类)。但是,此解决方案不会更改Stream API中使用的内部分割策略。通过流API创建的子任务的数量由预定的常数AbstractTask类控制:

/** 
* Default target factor of leaf tasks for parallel decomposition. 
* To allow load balancing, we over-partition, currently to approximately 
* four tasks per processor, which enables others to help out 
* if leaf tasks are uneven or some processors are otherwise busy. 
*/ 
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 

正如你可以看到它比普通池并行(这是由CPU内核的默认数量)大四倍。真正的分割算法有点棘手,但即使它们全部被阻塞,大致也不会有超过4x-8x的任务。例如,如果您有8个CPU内核,则Thread.sleep()测试将很好地工作到IntStream.range(0, 32)(因为32 = 8 * 4)。但是对于IntStream.range(0, 64),您将有32个并行任务,每个处理两个输入数字,因此整个处理需要20秒,而不是10个。

+0

分解的好处。这当然会限制单个任务可能花费的时间,但是如果队列中有足够的其他计算任务,它将不会限制总吞吐量。结论:如果只有吞吐量是个问题,那么解决方案很好。如果单个I/O任务的响应时间是一个问题,并且所涉及的单个I/O任务可以在更多步骤中分解,则应考虑其他解决方案。 – yankee

+3

不要忘记:Stream API使用Fork/Join是一个实现细节。只要Streams不保证使用该框架,就不能保证使用'ManagedBlocker'将会提高并发性...... – Holger