2016-04-29 78 views
0

我写了一个简单的消费,生产模式,以帮助我实现以下任务:阅读大量使用制作文件会导致CPU使用率是100%

  1. 阅读从目录中的文件,该文件包含五十万元TSV(制表符分隔)文件。
  2. 将每个文件操作为一个数据结构并将其放入阻塞队列中。
  3. 使用消费者和查询数据库消费队列。
  4. 比较两个哈希映射,如果有差异,则将差异打印到文件。

当我运行该程序时,即使使用5个线程,我的CPU消耗突然增加到100%。这可能是因为我使用单个生产者来读取文件?

文件的例子(制表符分隔)

Column1 Column2 Column3 Column 4 Column5 
A   1   *   -   - 
B   1   *   -   - 
C   1   %   -   - 

生产者

public class Producer implements Runnable{ 
private BlockingQueue<Map<String, Map<String, String>>> m_Queue; 
private String m_Directory; 

public Producer(BlockingQueue<Map<String, Map<String, String>>> i_Queue, String i_Directory) 
{ 
    m_Queue = i_Queue; 
    m_Directory = i_Directory; 
} 

@Override 
public void run() 
{ 
    if (Files.exists(Paths.get(m_Directory))) 
    { 
     File[] files = new File(m_Directory).listFiles(); 

     if (files != null) 
     { 
      for (File file : files) 
      { 
       Map<String, String> map = new HashMap<>(); 
       try (BufferedReader reader = new BufferedReader(new FileReader(file))) 
       { 
        String line, lastcolumn3 = "", column1 = "", column2 = "", column3 = ""; 
        while ((line = reader.readLine()) != null) 
        { 
         //Skip column header 
         if (!Character.isLetter(line.charAt(0))) 
         { 
          String[] splitLine = line.split("\t"); 

          column1 = splitLine[0].replace("\"", ""); 
          column2 = splitLine[1].replace("\"", ""); 
          column3 = splitLine[2].replace("\"", ""); 

          if (!lastcolumn3.equals(column3)) 
          { 
           map.put(column3, column1); 
           lastcolumn3 = column3; 
          } 
         } 
        } 

        map.put(column3, column1); 

        //Column 1 is always the same per file, it'll be the key. Column2 and Column3 are stored as the value (as a key-value pair) 
        Map<String, Map<String, String>> mapPerFile = new HashMap<>(); 
        mapPerFile.put(column2, map); 

        m_Queue.put(mapPerFile); 
       } 
       catch (IOException | InterruptedException e) 
       { 
        System.out.println(file); 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 
}} 

消费者

public class Consumer implements Runnable{ 
private HashMap<String, String> m_DBResults; 
private BlockingQueue<Map<String, Map<String, String>>> m_Queue; 
private Map<String, Map<String, String>> m_DBResultsPerFile; 
private String m_Column1; 
private int m_ThreadID; 

public Consumer(BlockingQueue<Map<String, Map<String, String>>> i_Queue, int i_ThreadID) 
{ 
    m_Queue = i_Queue; 
    m_ThreadID = i_ThreadID; 
} 

@Override 
public void run() 
{ 
    try 
    { 
     while ((m_DBResultsPerFile = m_Queue.poll()) != null) 
     { 
      //Column1 is always the same, only need the first entry. 
      m_Column1 = m_DBResultsPerFile.keySet().toArray()[0].toString(); 

      //Queries DB and puts returned data into m_DBResults 
      queryDB(m_Column1); 

      //Write the difference, if any, per thread into a file. 
      writeDifference(); 
     } 
    } 
    catch (Exception e) 
    { 
     e.printStackTrace(); 
    } 
} 

private void writeDifference() 
{ 
    MapDifference<String, String> difference = Maps.difference(m_DBResultsPerFile.get(m_Column1), m_DBResults); 

    if (difference.entriesOnlyOnLeft().size() > 0 || difference.entriesOnlyOnRight().size() > 0) 
    { 
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(String.format("thread_%d.tsv", m_ThreadID), true))) 
     { 
      if (difference.entriesOnlyOnLeft().size() > 0) 
      { 
       writer.write(String.format("%s\t%s\t", "Missing", m_Column1)); 
       for (Map.Entry<String, String> entry : difference.entriesOnlyOnLeft().entrySet()) 
       { 
        writer.write(String.format("[%s,%s]; ", entry.getKey(), entry.getValue())); 
       } 

       writer.write("\n"); 
      } 
      if (difference.entriesOnlyOnRight().size() > 0) 
      { 
       writer.write(String.format("%s\t%s\t", "Extra", m_Column1)); 
       for (Map.Entry<String, String> entry : difference.entriesOnlyOnRight().entrySet()) 
       { 
        writer.write(String.format("[%s,%s]; ", entry.getKey(), entry.getValue())); 
       } 

       writer.write("\n"); 
      } 
     } 
     catch (IOException e) 
     { 
      e.printStackTrace(); 
     } 
    } 
}} 

主要

public static void main(String[]args) { 
BlockingQueue<Map<String, Map<String,String>>> queue = new LinkedBlockingQueue <>(); 

//Start the reader thread. 
threadPool.execute(new Producer(queue, args[0])); 

//Create configurable threads. 
for (int i = 0; i < 10; i++) { 
    threadPool.execute(new Consumer(queue, i + 1)); 
} 

threadPool.shutdown(); 
System.out.println("INFO: Shutting down threads."); 

try { 
    threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    System.out.println("INFO: Threadpool terminated successfully."); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
}} 
+3

CPU利用率是一件坏事吗? –

+2

你希望你的工作更慢吗? –

+0

如果您认为您使用的CPU比您应该多(即总运行时间过高),则可能您的算法不好。对代码进行剖析以查看可能的问题。 – Andreas

回答

6

你的CPU使用率很可能是由于这样的:

while ((m_DBResultsPerFile = m_Queue.poll()) != null) 

poll方法不会阻止。它立即返回。所以你每秒执行循环数百万次。

您应该使用take(),这实际上将等待可用元素:

while ((m_DBResultsPerFile = m_Queue.take()) != null) 

documentation for BlockingQueue很好地概括了这一切,在某种程度上(在我看来)避免任何混乱。

+2

'take()'实际上不会返回null。 – Andreas

+0

我试图用take()替换poll()并没有看到太大的区别。我会看看我是否可以更好地调整生产者班。 – ocp1000