2013-10-25 98 views
1

我没有太多的经验,使多线程应用程序,但我觉得我的计划是在一个点,它可以由具有多个线程中受益。我正在做一个更大规模的项目,涉及使用分类器(如机器学习中)对大约32000个客户进行分类。我调试了该程序,发现需要大约一秒来对每个用户进行分类。换句话说,这需要8.8小时才能完成!在Java中使用多个线程来缩短项目时间

有没有什么办法可以运行4个线程,每个线程处理8000个用户?第一个线程将处理1-8000,第二个8001-16000,第三个16001-23000,第四个23001-32000。此外,截至目前每个分类是通过调用另一个类的静态函数...

然后当除主之一,其他线程应该结束完成。这是可行的吗?如果是这样,我将不胜感激,如果有人可以提供关于如何做到这一点的提示或步骤。我很熟悉关键部分的想法(等待/信号),但几乎没有经验。

再次,任何帮助将非常感激!有关如何处理这种情况的提示和建议值得欢迎!不知道它的问题,但我有一个2.53 GHZ处理器速度的Core 2 Duo PC。

+0

通过实现上述内容,您不会减少计算时间。 – Kon

+0

如果您的工作完全受CPU限制,则您的有效并行性级别将严格限制为CPU内核数量。 – SLaks

+2

确保任何共享状态都是线程安全的。更好的是,摆脱任何共享状态。 – SLaks

回答

2

这是Apache Hadoop的,这需要大约每个服务器数据的64MB块太轻...但..它是阿卡演员的绝佳机会,而且,它恰好支持Java!

http://doc.akka.io/docs/akka/2.1.4/java/untyped-actors.html

基本上,你可以有4个演员做的工作,并为他们完成分类的用户,或可能会更好,用户数量,他们要么将它传递给一个“接收器”的演员,这使将信息导入到数据结构或输出文件中,或者,通过每次写入文件来执行并行I/O ..然后可以在完成所有文件时检查/组合文件。

如果你想获得更看中/强大的,你可以把远程服务器上的演员。与他们沟通仍然非常容易,并且您将利用多台服务器的CPU /资源。

我写了一篇自己的阿卡演员,但它在斯卡拉,所以我就饶你。但是,如果你是谷歌“akka演员”,你会得到很多关于如何使用它的手持示例。勇敢一点,立即潜入并试验。 “演员系统”是一个非常简单的概念。我知道你可以做到这一点!

+0

哇,这听起来非常神奇!我之前从数据库类中听说过Hadoop,但是Akka演员听起来很有前途。我会深入探讨这一点。我相信我会在这个过程中学到很多有用的东西。再次感谢您和其他所有回复! – Tastybrownies

1

将数据拆分为实现Runnable的对象,然后将它们传递给新线程。

在这种情况下,有四个以上的线程不会消灭你,但你不能获得比核心更多的并行工作(如注释中提到的那样) - 如果线程多于核心,系统将不得不处理谁去的时候。

如果我有一类客户,我想发出一个线程来8000个客户放在首要更大集合我可能会做这样的事情:

public class CustomerClassifier implements Runnable { 

    private customer[] customers; 

    public CustomerClassifier(customer[] customers) { 
    this.customers = customers; 
    } 
    @Override 
    public void run() { 
    for (int i=0; i< customers.length; i++) { 
     classify(customer);//critical that this classify function does not 
         //attempt to modify a resource outside this class 
         //unless it handles locking, or is talking to a database 
         //or something that won't throw fits about resource locking 
    } 
    } 
} 

然后发出这些线程别处

int jobSize = 8000; 
customer[] customers = new customer[jobSize](); 
int j = 0; 
for (int i =0; i+j< fullCustomerArray.length; i++) { 
    if (i == jobSize-1) { 
    new Thread(new CustomerClassifier(customers)).start();//run will be invoked by thread 
    customers = new Customer[jobSize](); 
    j += i; 
    i = 0; 
    } 
    customers[i] = fullCustomerArray[i+j]; 
} 

如果你有你的分类方法会影响同一资源的地方,你将不得不 实现锁定,也将杀死获得了一定程度的优势。

并发是非常复杂的,需要大量的心思,我也建议看oracle的文档http://docs.oracle.com/javase/tutorial/essential/concurrency/index.html (我知道链接是坏的,但希望在Oracle文档不走动太多?)

免责声明:我不是并发设计或多线程(不同主题)的专家。

+0

非常感谢您花时间写出来的时间。我会牢记你的想法! – Tastybrownies

1

如果将输入数组拆分为4个相等的4个线程的子阵列,则不能保证所有线程同时完成。您最好将所有数据放在一个队列中,让所有工作线程从该通用队列中提供。使用安全的BlockingQueue实现为了不写低级同步/等待/通知代码。

+0

好点,我应该提到,他们这样划分他们形成了关于秩序无关的假设,并且在任务完成时并没有跟踪任何事情。 – Catalyst

0

从java 6我们有一些方便的并发使用。您可能需要考虑使用线程池来实现更清晰的实现。

package com.threads; 

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ParalleliseArrayConsumption { 

    private int[] itemsToBeProcessed ; 

    public ParalleliseArrayConsumption(int size){ 
     itemsToBeProcessed = new int[size]; 
    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     (new ParalleliseArrayConsumption(32)).processUsers(4); 

    } 

    public void processUsers(int numOfWorkerThreads){ 
     ExecutorService threadPool = Executors.newFixedThreadPool(numOfWorkerThreads); 
     int chunk = itemsToBeProcessed.length/numOfWorkerThreads; 
     int start = 0; 
     List<Future> tasks = new ArrayList<Future>(); 
     for(int i=0;i<numOfWorkerThreads;i++){ 
      tasks.add(threadPool.submit(new WorkerThread(start, start+chunk))); 
      start = start+chunk; 
     } 
      // join all worker threads to main thread 
     for(Future f:tasks){ 
      try { 
       f.get(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     threadPool.shutdown(); 
     while(!threadPool.isTerminated()){ 
     } 

    } 

    private class WorkerThread implements Callable{ 

     private int startIndex; 
     private int endIndex; 

     public WorkerThread(int startIndex, int endIndex){ 
      this.startIndex = startIndex; 
      this.endIndex = endIndex; 
     } 

     @Override 
     public Object call() throws Exception { 
      for(int currentUserIndex = startIndex;currentUserIndex<endIndex;currentUserIndex++){ 
       // process the user. Add your logic here 
       System.out.println(currentUserIndex+" is the user being processed in thread " +Thread.currentThread().getName()); 
      } 
      return null; 
     }  

    } 

}