2013-03-15 32 views
1

我从文件中读取行,当然在一个线程中。行按键排序。并行部分任务的Java并发模式

然后我收集具有相同键(15-20行)的行,进行解析,计算大等,并将结果对象推送到统计类。

我想将我的程序并行读入一个线程,在许多线程中进行解析和计算,并将结果加入到一个线程中写入stat类。

java7框架中是否存在针对此问题的任何就绪模式或解决方案?

我执行人多线程意识到这一点,推到BlockingQueue的,并在另一个线程读取队列,但我认为我的代码吸并会产生错误

非常感谢

UPD:

我无法映射内存中的所有文件 - 这是非常大的

+2

听起来像你在正确的轨道上。让我们看看它。 – 2013-03-15 12:04:41

+2

你看看Fork/Join框架能为你做什么吗? – 2013-03-15 12:15:27

回答

0

这里是如果问你试图分裂工作,我会做什么:

public class App { 

    public static class Statistics { 
    } 

    public static class StatisticsCalculator implements Callable<Statistics> { 

     private final List<String> lines; 

     public StatisticsCalculator(List<String> lines) { 
      this.lines = lines; 
     } 

     @Override 
     public Statistics call() throws Exception { 
      //do stuff with lines 
      return new Statistics(); 
     } 
    } 

    public static void main(String[] args) { 
     final File file = new File("path/to/my/file"); 
     final List<List<String>> partitionedWork = partitionWork(readLines(file), 10); 
     final List<Callable<Statistics>> callables = new LinkedList<>(); 
     for (final List<String> work : partitionedWork) { 
      callables.add(new StatisticsCalculator(work)); 
     } 
     final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10)); 
     final List<Future<Statistics>> futures; 
     try { 
      futures = executorService.invokeAll(callables); 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
     try { 
      for (final Future<Statistics> future : futures) { 
       final Statistics statistics = future.get(); 
       //do whatever to aggregate the individual 
      } 
     } catch (InterruptedException | ExecutionException ex) { 
      throw new RuntimeException(ex); 
     } 
     executorService.shutdown(); 
     try { 
      executorService.awaitTermination(1, TimeUnit.DAYS); 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 

    static List<String> readLines(final File file) { 
     //read lines 
     return new ArrayList<>(); 
    } 

    static List<List<String>> partitionWork(final List<String> lines, final int blockSize) { 
     //divide up the incoming list into a number of chunks 
     final List<List<String>> partitionedWork = new LinkedList<>(); 
     for (int i = lines.size(); i > 0; i -= blockSize) { 
      int start = i > blockSize ? i - blockSize : 0; 
      partitionedWork.add(lines.subList(start, i)); 
     } 
     return partitionedWork; 
    } 
} 

我已创建一个Statistics对象,持有所做的工作的结果。

有一个StatisticsCalculator对象这是一个Callable<Statistics> - 这是做计算。给它一个List<String>,它处理这些行并创建Statistics

readLines方法我给你实施。

最重要的方法在许多方面是partitionWork方法,这个划分传入List<String>这是所有文件中使用blockSize线成List<List<String>>。这基本上决定了每个线程应该有多少工作,调整参数非常重要。就好像每项工作只有一条线路,那么管理费用可能会超出优势,而如果每条线路的工作量只有一条,那么只有一条线路工作Thread

最后操作的肉是main方法。这将调用read和partition方法。它产生了一个ExecutorService,其线程数等于工作的位数,但最多为10个。您可以通过这种方式使其等于您拥有的核心数。

main方法然后提交所有Callable S,一个用于每个组块的ListexecutorService。方法会阻止invokeAll直到完成工作。

该方法现在循环遍历每个返回的List<Future>并获取生成的每个对象的Statistics对象;准备聚合。

之后不要忘记关闭executorService,因为它会阻止您的申请表退出。

编辑

OP希望通过线上线读取所以这里是一个修订main

public static void main(String[] args) throws IOException { 
    final File file = new File("path/to/my/file"); 
    final ExecutorService executorService = Executors.newFixedThreadPool(10); 
    final List<Future<Statistics>> futures = new LinkedList<>(); 
    try (final BufferedReader reader = new BufferedReader(new FileReader(file))) { 
     List<String> tmp = new LinkedList<>(); 
     String line = null; 
     while ((line = reader.readLine()) != null) { 
      tmp.add(line); 
      if (tmp.size() == 100) { 
       futures.add(executorService.submit(new StatisticsCalculator(tmp))); 
       tmp = new LinkedList<>(); 
      } 
     } 
     if (!tmp.isEmpty()) { 
      futures.add(executorService.submit(new StatisticsCalculator(tmp))); 
     } 
    } 
    try { 
     for (final Future<Statistics> future : futures) { 
      final Statistics statistics = future.get(); 
      //do whatever to aggregate the individual 
     } 
    } catch (InterruptedException | ExecutionException ex) { 
     throw new RuntimeException(ex); 
    } 
    executorService.shutdown(); 
    try { 
     executorService.awaitTermination(1, TimeUnit.DAYS); 
    } catch (InterruptedException ex) { 
     throw new RuntimeException(ex); 
    } 
} 

这由线流文件中的行和后线的给定数量触发新任务处理到执行者的线路。

您需要调用clear上的CallableList<String>当你用它做的Callable实例是由Future就是他们返回引用。如果在完成这些操作后清除List,应该大大减少内存占用量。

进一步的增强很可能使用的建议hereExecutorService阻止,直到有一个备用的话题 - 这将guranatee有从来没有超过threads*blocksize行内存在的时间,如果您清除List■当Callable与他们完成。

+0

谢谢你的代码,但我不能映射内存中的所有文件 - 它太大了 – 2013-03-15 15:39:15

+0

那么改变代码 - 然后创建工作人员。每个x行向执行器提交另一个任务并存储'Fututre'。 – 2013-03-15 15:42:05

+0

你会意识到我的解决方案。现在这个代码会关闭堆空间(文件很大,统计数据也会对象大)。另一个线程处理准备好的期货存储在队列中? – 2013-03-15 16:11:18

2

你已经有了主要的方法。 CountDownLatch,Thread.join,Executors,Fork/Join。另一种选择是Akka框架,它具有1-2微秒的消息传递开销,并且是开源的。然而,让我分享另一种经常执行上述方法的方法,这种方法更简单,这种方法源于许多公司针对Java文件的批处理文件加载。

假设你将工作分解的目标是表现而不是学习。以从开始到结束需要多长时间衡量的性能。那么通常很难让它比内存映射文件更快,并且在单个线程中进行处理,而这个线程已被固定到单个内核中。它也提供了更简单的代码。双赢。

这可能是违反直觉的,但是文件处理的速度几乎总是受限于文件加载的效率。不是如何平行的处理。因此映射文件的内存是一个巨大的胜利。一旦映射内存,我们希望该算法在执行文件加载时与硬件的争用较少。现代硬件倾向于将IO控制器和内存控制器与CPU相同的插槽上;当它与CPU内部的预取程序结合使用时,会在从单线程有序地处理文件时导致很多效率。这可能太极端了,并行可能会慢得多。将线程固定到内核通常会将内存限制算法加速5倍。这就是内存映射部分如此重要的原因。

如果您还没有,请尝试一下。

+0

感谢你提供关于内存映射的建议。我的文件 - gzip中有40GB,所以我不想把它全部映射出来,但我会尝试在这里进行优化。我的多线程代码现在加速了2次并停留在阅读 – 2013-03-15 15:33:33

+1

您应该查看Martin Thomson写的关于[Java顺序IO性能]的内容(http://mechanical-sympathy.blogspot.fr/2011/12/java- sequential-io-performance.html)去年。像往常一样你的里程可能会有所不同如果您已将IO视为瓶颈,则应针对不同的解决方案进行基准测试。 – 2013-03-15 18:51:28

+1

事实上,40克会有些大图:)我第二次看马丁汤普森的工作,特别是他的博客上有一些可以使用的基准代码。尼斯找到Clement。鉴于如此大的文件,请注意过早提升物体。如果发生这种情况,GC会伤害你。 gzip也会限制你的选择。 – 2013-03-15 19:27:00

1

没有事实和数字,很难给你提供建议。所以让我们从头开始:

  1. 您必须找出瓶颈。你真的需要并行执行计算吗?还是你的工作IO限制?如果可能,避免并发,它可能会更快。
  2. 如果计算必须并行完成,您必须确定您的任务必须有多好或粗粒度。您需要测量您的计算和任务才能调整它们的大小。避免创建太多的任务
  3. 你应该有一个IO线程,几个worker和一个“data gatherer”线程。没有可变数据。
  4. 请务必不要因提交任务而减慢IO线程。否则,你应该使用更粗粒度的任务或使用更好的任务调度程序(谁说disruptor?)
  5. “数据采集”线程应该是唯一一个变异的最终状态
  6. 避免不必要的数据复制和对象创建。通常,在大文件上迭代时,瓶颈是GC。上周,我通过flyweight模式取代了标准scala对象,实现了6倍加速。您还应该尝试预先分配所有内容并使用大缓冲区(页面大小)。
  7. 避免磁盘寻道。

这样说你应该是一个正确的轨道。您可以使用正确大小的任务从Executor开始。任务写入一个数据结构,就像阻塞队列一样,由工作者和“数据收集器”线程共享。这种线程模型非常简单,高效并且很难出错。它通常足够有效。如果你仍然需要更好的表现,那么你必须剖析你的应用并理解瓶颈。然后你可以决定如何去做:改善你的任务规模,使用更快的工具,比如干扰器/ Akka,改进IO,创建更少的对象,调整代码,购买更大的机器或更快的磁盘,移动到Hadoop等。到一个核心(需要平台特定的代码)也可以提供显着的提升。