2014-09-02 453 views
-2
package com.spse.pricing.client.main; 

import java.util.stream.IntStream; 

public class NestedParalleStream { 

    int total = 0; 

    public static void main(String[] args) { 

     NestedParalleStream nestedParalleStream = new NestedParalleStream(); 
     nestedParalleStream.test(); 
    } 


    void test(){ 

     try{ 

      IntStream stream1 = IntStream.range(0, 2); 
      stream1.parallel().forEach(a ->{ 
       IntStream stream2 = IntStream.range(0, 2); 
       stream2.parallel().forEach(b ->{ 
        IntStream stream3 = IntStream.range(0, 2); 
        stream3.parallel().forEach(c ->{ 
         //2 * 2 * 2 = 8; 
         total ++; 

        }); 
       }); 
      }); 

      //It should display 8 
      System.out.println(total); 

     }catch(Exception e){ 

      e.printStackTrace(); 
     } 
    } 
} 

请帮助我们如何自定义parallestream以确保我们能够获得一致性结果。Java 8嵌套ParallelStream不能正常工作

+0

它实际显示的是什么? – 2014-09-02 14:58:11

+1

'它应该显示8'好,它显示什么呢?您是否尝试过使用'total' ['volatile'](http://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html)? – kajacx 2014-09-02 14:58:12

+0

@kajacx volatile不会帮助,因为'a ++'是'a = a + 1'的快捷键,这不是原子的 – talex 2014-09-02 15:01:02

回答

3

声明中的问题total ++;它在多个线程中同时被调用。

你应该​​保护或使用AtomicInteger

4

由于多个线程递增total 您必须声明它 volatile避免竞争条件


编辑:挥发性品牌的读/写操作是原子操作,但total++需要超过一个操作。在多个线程变异值

AtomicInteger total = new AtomicInteger(); 
... 
total.incrementAndGet(); 
3

LongAdderLongAccumulator优于AtomicLongAtomicInteger和它打算在年底要读取相对较少的时间,如一次:因为这个原因,你应该使用的AtomicInteger计算。加法器/累加器对象避免了原子对象可能发生的争用问题。 (对于double值有相应的加法器/累加器对象。)

通常有一种方法可以使用reduce()collect()重写累加。这些往往是可取的,特别是如果积累(或收集)的价值不是longdouble

2

关于解决方法的可变性存在一个主要问题。以您想要的方式解决问题的更好方法如下:

int total = IntStream.range(0,2) 
      .parallel() 
      .map(i -> { 
       return IntStream.range(0,2) 
         .map(j -> { 
          return IntStream.range(0,2) 
            .map(k -> i * j * k) 
            .reduce(0,(acc, val) -> acc + 1); 
         }).sum(); 
      }).sum();