2016-09-22 85 views
0

对于lucene索引(v6.1)快速创建,我想将Slick 3.1(Scala)中的数据拆分为任意部分(块),以便在线程中传递不同的数据集以加速索引过程。我在Scala中编写了以下代码来从MySQL获取数据。如何将Slick 3.1(Scala)中的数据拆分为4部分

class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable {  
    import databaseService._ 
    import databaseService.driver.api._ 
    import com.github.t3hnar.bcrypt._  
    def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result)  
} 
case class NoteEntity(id: Option[Long] = None, title: String, teaser: String, description: String) 

代码NotesService

class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable { 

    import databaseService._ 
    import databaseService.driver.api._ 
    import com.github.t3hnar.bcrypt._ 

    def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result) 

} 

要提取NotesService数据我用:

def setI = { 
    val NUM_THREADS = Runtime.getRuntime().availableProcessors() 
    val IndexStoreDir = Paths.get("/var/www/html/Index") 
    val analyzer = new StandardAnalyzer() 
    val writerConfig = new IndexWriterConfig(analyzer) 
    writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND) 
    writerConfig.setRAMBufferSizeMB(500) 
     .setMaxBufferedDocs(2) 
     .setMergeScheduler(new ConcurrentMergeScheduler()) 
    val directory = FSDirectory.open(IndexStoreDir) 
    var writer = new IndexWriter(directory, writerConfig) 

    val threads = Array.ofDim[IndexTh](NUM_THREADS) 
    val notes = notesService.getNotes() 

    for (i <- 0 until NUM_THREADS){ 

     threads(i) = new IndexTh(notesService, writer) 
     //here on this line I want to pass different sets of data to thread. 
    } 
    for (i <- 0 until NUM_THREADS) { 
     threads(i).start() 
     println("Thread " + i + " Started!") 
    } 
    } 

这里就行:

threads(i) = new IndexTh(notesService, writer) 

我怎么可以拆分来自notesService的数据传递给threa d? 如何将笔记中的数据分成多个块? 我想这样的数据:

假设notesService.getNotes()检索20000行数据。现在我想将这些行分成4000行的5部分,这样每4000行数据就可以传递给不同的线程。

+0

你是什么意思块?分页? – Roman

+0

我想通过不同的数据集(拆分主要数据集)在多个线程 – Sujit

+1

嗯我还是不完全明白问题是什么。也许你需要输出的例子会有帮助。 – Roman

回答

0

最后,我找到了答案,研究时间长:

使用线程:

def setI = { 
    val NUM_THREADS = Runtime.getRuntime().availableProcessors() 
    val curNotes = notesService.getNotes() 

    val totalRows = Await.result(curNotes, Duration.Inf).length 
    var totalPages = totalRows/NUM_THREADS 
    if(totalPages != totalPages.toInt){ 
     totalPages = totalPages + 1 
    } 
    var tmp = Await.result(curNotes, Duration.Inf).grouped(totalPages).toList 
    val rows = tmp(tmp.length-2) ++ tmp.last 
    val threads = Array.ofDim[Index](NUM_THREADS) 

    val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex") 
    val analyzer = new StandardAnalyzer() 
    val writerConfig = new IndexWriterConfig(analyzer) 
    writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND) 
    writerConfig.setRAMBufferSizeMB(500) 
     .setMaxBufferedDocs(10) 
     .setMergeScheduler(new ConcurrentMergeScheduler()) 
    val directory = FSDirectory.open(IndexStoreDir) 
    val writer = new IndexWriter(directory, writerConfig) 
    var count = 0 

    for(i <- 0 until tmp.length - 2){ 
     count = i 
     threads(i) = new Index(tmp(i), writer, i) 
    } 
    count = count + 1 
    threads(count) = new Index(rows, writer, count) 

    for (i <- 0 until NUM_THREADS) { 
     println("Thread :" + threads(i).getName + " => " + (i + 1) + " Started!") 
     threads(i).start() 
    } 
    } 

使用Scala的未来:

def setFutureIndex = { 
    val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex") 
    val analyzer = new StandardAnalyzer() 
    val writerConfig = new IndexWriterConfig(analyzer) 
    writerConfig.setOpenMode(OpenMode.CREATE) 
    writerConfig.setRAMBufferSizeMB(500) 
    val directory = FSDirectory.open(IndexStoreDir) 
    val writer = new IndexWriter(directory, writerConfig) 
    val notes = notesService.getNotes() //Gets all notes from slick. Data is coming in getNotes() 
    var doc = new Document() 

    def indexingFuture = { 
     val list = Seq (
     notes.map(_.foreach { 
      case (note) => 
      writeToDoc(note, writer) 
     }) 
    ) 
     Future.sequence(list) 
    } 

    Await.result(indexingFuture, Duration.Inf) 

    /*indexingFuture.onComplete { 
     case Success(value) => println(value) 
     case Failure(e) => e.printStackTrace() 
    }*/ 
    } 

    def writeToDoc(note: NoteEntity, writer: IndexWriter) = Future { 
    println("*****Indexing: " + note.id.get) 
    var doc = new Document() 
    var field = new TextField("title", " {##" + note.id.get + "##} " + note.title, Field.Store.YES) 
    doc.add(field) 

    field = new TextField("teaser", note.teaser, Field.Store.YES) 
    doc.add(field) 

    field = new TextField("description", note.description, Field.Store.YES) 
    doc.add(field) 

    writer.addDocument(doc) 

    writer.commit() 
    println("*****Completed: " + note.id.get) 
    var status = "*****Completed: " + note.id.get 
    }