2016-01-06 1955 views
1

我有一个测试代码parallelStream()发送请求到服务器机器。如何减少Java parallelStream中的线程数量?

Report report = 
    requestsList.parallelStream() 
       .map(request -> freshResultsGenerator.getResponse(request, e2EResultLongBL)) 
       .map(response -> resultsComparer.compareToBl(response, e2EResultLongBL, 
          astarHistogramsArrayBl, latencyHistogramBl)) 
       .reduce(null, 
         (sumReport, compare2) -> 
         { 
          if (sumReport == null) { 
           sumReport = new Report(); 
          } 
          sumReport.add(compare2); 
          return sumReport; 
         }, 
         (report1, report2) -> 
         { 
          Report report3 = new Report(); 
          report3.add(report1); 
          report3.add(report2); 
          return report3; 
         }); 

这台机器的负载太多,并且很快就会返回HTTP 404错误。

有两件事情我没有找到在谷歌的答案:

  1. 什么是parallelStream默认线程#,如果不customed 集?
  2. 如何将工作线程的数量设置为4?

回答

2

Stream API使用ForkJoinPool执行并发任务。引用它的文档:

共用池是通过使用默认参数的构造默认值,但这些可以设置三个系统属性来控制:

  • java.util.concurrent.ForkJoinPool.common.parallelism - 并行级,一个非负整数
    ...

而且

a ForkJoinPool可以用给定的目标并行性级别来构造;默认情况下,等于可用处理器的数量。

所以定制的线程数,你可以在系统属性java.util.concurrent.ForkJoinPool.common.parallelism的值设置为你想要的:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4") 

工作线程的数量设置为4.默认情况下,数量线程将等于您拥有的处理器数量。

+1

更改属性将有助于更改默认值。如果您需要使用专用ForkJoinPool的不同流量的不同值,将会是更好的解决方案。 – SubOptimal

1

ParallelStream使用ForkJoinPool.commonPool()已初始化为您的核心数 - 1

它可以将您自己的ForkJoin Executor传递给parallelStream,如here所述,但是执行者必须是ForkJoin,这对于IO绑定任务来说不是最好的。

+0

谢谢。为什么“这不是IO界限任务的最佳选择”? –

+0

@EladBenda任何fork连接执行器 –

1

你可以启动你的管道自定义ForkJoinPool内,它将被用于获取结果:

ForkJoinPool fjp = new ForkJoinPool(2); 
System.out.println("My pool: " + fjp); 
String result = CompletableFuture.supplyAsync(
    () -> Stream.of("a", "b", "c").parallel() 
    .peek(x -> System.out.println(
     ((ForkJoinWorkerThread) Thread.currentThread()).getPool())) 
    .collect(Collectors.joining()), fjp).join(); 
System.out.println(result); 

StreamEx库增添了语法糖方法.parallel(fjp),使这个简单的:

String result = StreamEx.of("a", "b", "c") 
    .parallel(fjp) 
    .peek(x -> System.out.println(
     ((ForkJoinWorkerThread) Thread.currentThread()).getPool())) 
    .collect(Collectors.joining());