2012-02-16 90 views
13

我想在多线程的Lucene中构建我的索引。所以,我开始编写代码并编写下面的代码。首先我找到文件和每个文件,我创建一个线索来索引它。之后,我加入线程并优化索引。它可以工作,但我不确定...我可以大规模信任它吗?有什么办法可以改善它吗?使用lucene改进多线程索引

import java.io.File; 
import java.io.FileFilter; 
import java.io.FileReader; 
import java.io.IOException; 
import java.io.File; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import org.apache.lucene.index.IndexWriter; 
import org.apache.lucene.document.Field; 
import org.apache.lucene.document.Document; 
import org.apache.lucene.store.RAMDirectory; 
import org.apache.lucene.analysis.standard.StandardAnalyzer; 
import org.apache.lucene.analysis.StopAnalyzer; 
import org.apache.lucene.index.IndexReader; 
import org.apache.lucene.store.Directory; 
import org.apache.lucene.store.FSDirectory; 
import org.apache.lucene.util.Version; 
import org.apache.lucene.index.TermFreqVector; 

public class mIndexer extends Thread { 

    private File ifile; 
    private static IndexWriter writer; 

    public mIndexer(File f) { 
    ifile = f.getAbsoluteFile(); 
    } 

    public static void main(String args[]) throws Exception { 
    System.out.println("here..."); 

    String indexDir; 
     String dataDir; 
    if (args.length != 2) { 
     dataDir = new String("/home/omid/Ranking/docs/"); 
     indexDir = new String("/home/omid/Ranking/indexes/"); 
    } 
    else { 
     dataDir = args[0]; 
     indexDir = args[1]; 
    } 

    long start = System.currentTimeMillis(); 

    Directory dir = FSDirectory.open(new File(indexDir)); 
    writer = new IndexWriter(dir, 
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")), 
    true, 
    IndexWriter.MaxFieldLength.UNLIMITED); 
    int numIndexed = 0; 
    try { 
     numIndexed = index(dataDir, new TextFilesFilter()); 
    } finally { 
     long end = System.currentTimeMillis(); 
     System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds"); 
     writer.optimize(); 
     System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds"); 
     writer.close(); 
    } 
    System.out.println("Enjoy your day/night"); 
    } 

    public static int index(String dataDir, FileFilter filter) throws Exception { 
    File[] dires = new File(dataDir).listFiles(); 
    for (File d: dires) { 
     if (d.isDirectory()) { 
     File[] files = new File(d.getAbsolutePath()).listFiles(); 
     for (File f: files) { 
      if (!f.isDirectory() && 
      !f.isHidden() && 
      f.exists() && 
      f.canRead() && 
      (filter == null || filter.accept(f))) { 
       Thread t = new mIndexer(f); 
       t.start(); 
       t.join(); 
      } 
     } 
     } 
    } 
    return writer.numDocs(); 
    } 

    private static class TextFilesFilter implements FileFilter { 
    public boolean accept(File path) { 
     return path.getName().toLowerCase().endsWith(".txt"); 
    } 
    } 

    protected Document getDocument() throws Exception { 
    Document doc = new Document(); 
    if (ifile.exists()) { 
     doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES)); 
     doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); 
     String cat = "WIR"; 
     cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1); 
     cat = cat.substring(cat.lastIndexOf('/')+1, cat.length()); 
     //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES)); 
     //System.out.println(cat.subSequence(0, cat.length())); 
    } 
    return doc; 
    } 

    public void run() { 
    try { 
     System.out.println("Indexing " + ifile.getAbsolutePath()); 
     Document doc = getDocument(); 
     writer.addDocument(doc); 
    } catch (Exception e) { 
     System.out.println(e.toString()); 
    } 

    } 
} 

任何hep被视为。

回答

13

如果你想并行索引,有两件事情可以做:

  • 并行调用addDocument,
  • 提高您的合并调度的最大线程数。

你是在正确的路径上并行调用addDocuments,但每个文档产生一个线程将不会扩展,因为你需要索引的文档数量会增加。您应该使用固定大小的ThreadPoolExecutor。由于此任务主要是CPU密集型(取决于您的分析器和您检索数据的方式),因此将计算机的CPU数设置为最大线程数可能是一个好的开始。

关于合并调度程序,您可以增加可用于setMaxThreadCount method of ConcurrentMergeScheduler的最大线程数。请注意,顺序读取/写入的磁盘比随机读取/写入要好得多,因此,为合并调度程序设置的线程的最大数量太大会降低索引的速度而不是加快速度。

但在尝试并行化索引过程之前,您应该试着找出瓶颈在哪里。如果磁盘速度过慢,则瓶颈可能是冲洗和合并步骤,因此将调用并行化addDocument(主要在于分析文档并在内存中缓存分析结果)不会提高索引速度在所有。

一些旁注:

  • 有一个在Lucene的开发版本的一些正在进行的工作,以提高索引并行性(冲洗部分尤其是,这blog entry解释它是如何工作)。

  • Lucene在how to improve indexing speed上有很好的wiki页面,您可以在其中找到其他方法来提高索引速度。

+0

我真的很感激你你对线程数量的评论是非常有用的,我之前没有提到过...... – orezvani 2012-02-25 19:17:25

5

我认为更现代的方法是使用ThreadPoolExecutor并提交Runnable这是做你的索引。您可以等待所有线程使用.awaitTermination或CountdownLatch终止。

我不喜欢让你的主类扩展线程,只是创建一个可运行的内部类,在构造函数中使用它的倾向。这使得您的代码更具可读性,因为线程正在执行的工作与您的应用程序设置代码明显分离。

关于样式的一些注意事项,我不是让你的主类抛出异常的大爱好者,这通常意味着你没有清楚地了解你使用的不同的检查异常情况。 。除非你有一个非常具体的原因,否则通常这是不对的。

+0

预先感谢您。其实我实现了Runnable这是一个不错的主意,并使用ThreadPoolExecutor解决了jpountz提到的程序中的一个真正的bug。 – orezvani 2012-02-25 19:16:10

+0

“awaitTermination”的缺点是它不会等待所有线程完成,但会在n个时间单位后退出。 :-(一个循环是必要的, – 2012-06-14 19:04:20

+0

同意,这将证明IndexWriter并没有正常关闭,并且writer_lock仍然存在,即使索引编辑器没有操纵索引目录 – JasonHuang 2014-06-30 01:42:01