2015-11-07 148 views
-1
public static void getTestData() { 

     try { 

      filename = "InventoryData_" + form_id; 

      PrintWriter writer = new PrintWriter("/Users/pnroy/Documents/" +filename + ".txt"); 
      pids = new ArrayList<ProductId>(); 
      GetData productList = new GetData(); 
      System.out.println("Getting productId"); 
      pids = productList.GetProductIds(form_id); 
      int perThreadSize = pids.size()/numberOfCrawlers; 
      ArrayList<ArrayList<ProductId>> perThreadData = new  
      ArrayList<ArrayList<ProductId>>(numberOfCrawlers); 
      for (int i = 1; i <= numberOfCrawlers; i++) { 
       perThreadData.add(new ArrayList<ProductId>(perThreadSize)); 
       for (int j = 0; j < perThreadSize; j++) { 
        ProductId ids = new ProductId(); 
        ids.setEbProductID((pids.get(((i - 1) * perThreadSize + j))).getEbProductID()); 
        ids.setECProductID((pids.get(((i - 1) * perThreadSize + j))).getECProductID()); 
        perThreadData.get(i - 1).add(ids); 
       } 
      } 

      BlockingQueue<String> q = new LinkedBlockingQueue<String>(); 
      Consumer c1 = new Consumer(q); 
      Thread[] thread = new Thread[numberOfCrawlers]; 
      for (int k = 0; k <= numberOfCrawlers; k++) { 
       // System.out.println(k); 
       GetCombinedData data = new GetCombinedData(); 
       thread[k] = new Thread(data); 
       thread[k].setDaemon(true); 
       data.setVal(perThreadData.get(k), filename, q); 
       thread[k].start(); 

       // writer.println(data.getResult()); 
      } 
      new Thread(c1).start(); 
      for (int l = 0; l <= numberOfCrawlers; l++) { 
       thread[l].join(); 

      } 
     } catch (Exception e) { 
     } 
    } 

这里爬网程序的数量是线程的数量。java中有多线程的多线程

GetCombined类的运行方法具有以下的代码: 的PID作为来自 类CassController查询一个API的主要方法perThreadData.get(K-1)通过,我得到一些处理后的字符串结果。

public void run(){ 
     try{ 

     for(int i=0;i<pids.size();i++){ 
      //System.out.println("before cassini"); 
     CassController cass = new CassController(); 
     String result=cass.getPaginationDetails(pids.get(i)); 
     queue.put(result); 
     // System.out.println(result); 
     Thread.sleep(1000); 
      } 
     writer.close(); 
     }catch(Exception ex){ 

     } 

Consumer.java具有下面的代码:

public class Consumer implements Runnable{ 
    private final BlockingQueue queue; 
    Consumer(BlockingQueue q) { queue = q; } 
    public void run(){ 
     try { 
       while (queue.size()>0) 
       { 
        consume(queue.take()); 
       } 
      } catch (InterruptedException ex) 
       { 
       } 

    } 
    void consume(Object x) { 
     try{ 
     PrintWriter writer = new PrintWriter(new FileWriter("/Users/pnroy/Documents/Inventory", true)); 
     writer.println(x.toString()); 
     writer.close(); 
     }catch(IOException ex){ 

     } 

    } 

所以,如果我设置爬虫的数量为10,如果有500个记录每个线程将处理50个records.I需要编写结果成一个文件。我很困惑,我可以做到这一点,因为它的线程数组和每个线程正在做一堆操作。

我试过使用阻塞队列,但那是打印重复结果。我是新的多线程,不知道如何处理这种情况。 你可以请建议。

+1

您是否必须为此使用一个线程数组?如果你不这样做,请使用'ExecutorService'来代替,你会发现生活变得如此简单。 – biziclop

+1

为什么你的程序会在稍后要加入join的线程上调用'setDaemon(true)'? –

+0

它可能会帮助其他程序员理解你的代码,如果你要为你的类和变量使用名词式名称,以及为你的方法使用动词式名称。方法_do_东西,物件_are_东西。 –

回答

-1

随着许多有用的高级并发类的引入,现在推荐不再直接使用Thread类。即使是BlockingQueue类也是相当低级的。

取而代之,您有一个不错的应用程序CompletionService,它建立在ExecutorService的基础上。以下示例显示如何使用它。

您想要替换PartialResultTask(即主要处理发生的位置)和System.out.println(这是您可能希望将结果写入文件的位置)中的代码。

public class ParallelProcessing { 

    public static void main(String[] args) { 
     ExecutorService executionService = Executors.newFixedThreadPool(10); 
     CompletionService<String> completionService = new ExecutorCompletionService<>(executionService); 

     // submit tasks 
     for (int i = 0; i < 500; i++) { 
      completionService.submit(new PartialResultTask(i)); 
     } 

     // collect result 
     for (int i = 0; i < 500; i++) { 
      String result = getNextResult(completionService); 
      if (result != null) 
       System.out.println(result); 
     } 

     executionService.shutdown(); 
    } 

    private static String getNextResult(CompletionService<String> completionService) { 

     Future<String> result = null; 
     while (result == null) { 
      try { 
       result = completionService.take(); 
      } catch (InterruptedException e) { 
       // ignore and retry 
      } 
     } 

     try { 
      return result.get(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
      return null; 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 


    static class PartialResultTask implements Callable<String> { 

     private int n; 

     public PartialResultTask(int n) { 
      this.n = n; 
     } 

     @Override 
     public String call() { 
      return String.format("Partial result %d", n); 
     } 
    } 
}