2016-11-24 71 views
0

我需要读取数千行的日志文件并将每行写入Mongo数据库。我正在使用节点流读取文件。我正在使用'split'npm软件包将文件分成'行'。由于网络的考虑,MongoDB的写入将比读取的日志文件花费更多的时间。Node.js流写入MongoDB - 关注性能

我的核心代码如下所示:

var readableStream = fs.createReadStream(filename); 

      readableStream 
       .pipe(split()) // This splits the data into 'lines' 
       .on('data', function (chunk) { 

        chunkCount++; 
        slowAsyncFunctionToWriteLogEntryToDatabase(chunk); // This will take ages 

       }) 
       .on('end', function() { 
        // resolve the promise which bounds this process 
        defer.resolve({v:3,chunkCount: chunkCount}) 

       }); 

我需要担心的MongoDB系统将写入次数敲定排队?据推测,节点管道背压机制不会知道大量数据库写入正在排队?有什么方法可以“减慢”可读流,以便它在读取日志文件中的下一行之前等待每个MongoDB插入完成?我是否不必要地担心?

+0

我想你可以读取整个日志文件,然后用db.collection.insertMany()在一个db调用中插入所有文档。这将是更快 – felix

回答

1

由于使用pause()resume()似乎有一些问题。我将编写另一个选项,使用Transform流。

var Transform = require('stream').Transform; 

var myTransform = new Transform({ 
    transform(chunk, encoding, cb) { 
     chunkCount++; 

     syncFunctionToWriteLogEntryWithCallback(chunk, function() { 
     cb(); 
     }); 
    }, 

    flush(cb) { 
     chunkCount++; 
     syncFunctionToWriteLogEntryWithCallback(chunk, function() { 
     cb(); 
     }); 
    } 
}); 

readableStream 
     .pipe(split()) 
     .pipe(myTransform); 

使用转换流允许您在完成流处理后提供回调。

+1

太好了,现在它的行为完全符合我的想法 - 它允许在流移动到下一行之前对每行进行完全处理。完美的解决方案,虽然性能现在很糟糕,可能是因为我向Mongo推送请求的方式!非常感谢您的帮助。 – Journeyman

+0

性能改进可能会使用另一个mongoDB插入函数。有['bulk]](https://docs.mongodb.com/v3.2/reference/method/Bulk.insert/) – drinchev

+0

我改变了插入插入文档的数组而不是单个文档,改进的性能。一切都准备好了!再次感谢您的帮助:) – Journeyman

0

您可以在可读流中使用pause method在您将块写入到mongodb时停止流。

readableStream 
      .pipe(split()) // This splits the data into 'lines' 
      .on('data', function (chunk) { 

       readableStream.pause() 

       chunkCount++; 

       syncFunctionToWriteLogEntryWithCallback(chunk, function() { 
        readableStream.resume(); 
       }); 

      }) 
      .on('end', function() { 
       // resolve the promise which bounds this process 
       defer.resolve({v:3,chunkCount: chunkCount}) 

      }); 

我不认为在这种情况下MongoDB会出现严重问题。

+0

谢谢。我确实看过.pause(),但文档声明.pause()不会立即停止流,但可能会在暂停发生之前传递几个其他块。如果简历已经在那个时候被调用过,那么在我看来,期望的效果可能会被.pause()的这个方面完全否定。但我会尝试一下,看看它的表现如何。 – Journeyman

+0

所以,我加了pause()和resume()。对于一个有38508行的日志文件,当我们到达流的'结束'时,那么37913个Mongo写入仍然是'排队'等待处理,这表明pause()/ resume()机制并没有真正帮助节流循环。我真的很喜欢一种保持读/写的机制。 – Journeyman

+0

嗯。好的!也许使用'Transform stream'来实现效率会更高。我会试着举一个例子 – drinchev