2011-10-11 311 views
34

我有一个.csv文件,其中包含超过7000万行,其中每行将生成可运行,然后由threadpool执行。这个Runnable将会在Mysql中插入一条记录。线程池如何在关机后重新使用

更重要的是,我想为RandomAccessFile定位csv文件的位置。该位置被写入文件。我想在线程池中的所有线程都完成时写入此记录。因此将调用ThreadPoolExecutor.shutdown()。但是当更多的线路来临时,我需要再次使用线程池。我怎样才能重用这个当前的线程池而不是创建一个新的线程池。

的代码如下:

public static boolean processPage() throws Exception { 

    long pos = getPosition(); 
    long start = System.currentTimeMillis(); 
    raf.seek(pos); 
    if(pos==0) 
     raf.readLine(); 
    for (int i = 0; i < PAGESIZE; i++) { 
     String lineStr = raf.readLine(); 
     if (lineStr == null) 
      return false; 
     String[] line = lineStr.split(","); 
     final ExperienceLogDO log = CsvExperienceLog.generateLog(line); 
     //System.out.println("userId: "+log.getUserId()%512); 

     pool.execute(new Runnable(){ 
      public void run(){ 
       try { 
        experienceService.insertExperienceLog(log); 
       } catch (BaseException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 

     long end = System.currentTimeMillis(); 
    } 

    BufferedWriter resultWriter = new BufferedWriter(
      new OutputStreamWriter(new FileOutputStream(new File(
        RESULT_FILENAME), true))); 
    resultWriter.write("\n"); 
    resultWriter.write(String.valueOf(raf.getFilePointer())); 
    resultWriter.close(); 
    long time = System.currentTimeMillis()-start; 
    System.out.println(time); 
    return true; 
} 

谢谢!

回答

31

正如documentation说,你不能重用已关闭的ExecutorService。由于(a)在所有情况下它们可能无法按预期工作,因此我建议不要使用任何变通办法;和(b)你可以使用标准类实现你想要的。

您必须

  1. 实例化一个新ExecutorService;或

  2. 不终止ExecutorService

第一个解决方案很容易实现,所以我不会详细说明。

对于第二种情况,由于您想在所有提交的任务完成后执行操作,因此您可以查看ExecutorCompletionService并使用它。它封装了一个ExecutorService它会做的线程管理,但可运行将得到包裹到的东西,会告诉ExecutorCompletionService时,他们已经完成,因此它可以向您报告:在ExecutorCompletionService

ExecutorService executor = ...; 
ExecutorCompletionService ecs = new ExecutorCompletionService(executor); 

for (int i = 0; i < totalTasks; i++) { 
    ... ecs.submit(...); ... 
} 

for (int i = 0; i < totalTasks; i++) { 
    ecs.take(); 
} 

的方法take()课程将阻止,直到任务完成(正常或突然)。它会返回Future,所以你可以检查结果,如果你想。

我希望这可以帮助你,因为我没有完全理解你的问题。

+0

我如何强制停止线程或取消所有任务?使用exectuer.shutdownNow()?即使使用这种包装,它也能正常工作吗? –

3

创建和组的所有任务,并与invokeAll提交给池(当所有任务都成功完成,只有返回)

+0

什么是更好的方法来使用'ExecutorCompletionService'或'invokeAll'? –

1

在ExecutorService上调用关机后,no new Task will be accepted。这必须为每一轮任务创建一个新的ExecutorService。

然而,随着Java 8 ForkJoinPool.awaitQuiescence被引入。如果可以从正常的ExecutorService切换到ForkJoinPool,则可以使用此方法等待,直到没有更多任务在ForkJoinPool中运行而不必调用关闭。这意味着您可以使用任务填充ForkJoinPool,等待它为空(静止),然后再次使用任务填充它,等等。