2011-04-20 58 views
0

我想要并行处理大量的独立行。在下面的代码中,我创建了一个 NUM_THREAD包含 POOL_SIZE行的Theads。 每个线程都启动,然后使用'join'等待每个线程。Java +主题:并行处理行

我想这是一个不好的做法,因为这里完成的线程将不得不等待他的兄弟姐妹在池中。

什么是实现此代码的正确方法?我应该使用哪些类?

谢谢!

class FasterBin extends Thread 
    { 
    private List<String> dataRows=new ArrayList<String>(); 
    private Object result=null; 
    @Override 
    public void run() 
     { 
     for(String s:dataRows) 
      { 
      //Process item here (....) 
      } 
     } 
    } 


(...) 

List<FasterBin> threads=new Vector<FasterBin>(); 
String line; 
Iterator<String> iter=(...); 
for(;;) 
    { 
    while(threads.size()< NUM_THREAD) 
     { 
     FasterBin bin=new FasterBin(); 
     while(
      bin.dataRows.size() < POOL_SIZE && 
      iter.hasNext() 
      ) 
      { 
      nRow++; 
      bin.dataRows.add(iter.next()); 
      } 
     if(bin.dataRows.isEmpty()) break; 
     threads.add(bin); 
     } 
    if(threads.isEmpty()) break; 


    for(FasterBin t:threads) 
     { 
     t.start(); 
     } 
    for(FasterBin t:threads) 
     { 
     t.join(); 
     } 
    for(FasterBin t:threads) 
     { 
     save(t.result);// ## do something with the result (save into a db etc...) 
     } 

    threads.clear(); 
    } 

finally 
    { 
    while(!threads.isEmpty()) 
     { 

     FasterBin b=threads.remove(threads.size()-1); 
     try  { 
      b.interrupt(); 
      } 
     catch (Exception e) 
      { 
      } 
     } 
    } 

回答

3

自己做这一切!要获得1)健壮性和2)正确性非常困难。

而是重写你的东西来创建大量的Runnables或Callables,并使用合适的ExecutorService来让Executor用你想要的行为来处理它们。

请注意,这留在当前的JVM中。如果您有多个JVM(在多台机器上),我会建议您打开一个新问题。

+0

谢谢,这很有帮助 – Pierre 2011-04-20 14:15:26

2

java.util.concurrent.ThreadPoolExecutor。

 ThreadPoolExecutor x=new ScheduledThreadPoolExecutor(10); 
     x.execute(runnable); 

的概述请参见本:Java API for util.concurrent

1

直接使用线程实际上是不鼓励的 - 查看java.util.concurrent包,您会发现应该使用ThreadPools和Futures。

Thread.join并不表示线程等待其他线程,它表示您的主线程等待线程列表中的一个线程死亡。在这种情况下,您的主线程等待最慢的线程完成。我没有看到这种方法的问题。