2017-08-05 114 views
1

我试图有效地插入大量的数据(XML文件超过70GB的大小)没有崩溃我的MongoDB服务器。目前,这就是我在使用的NodeJS做xml-stream流插入XML数据的数据库

var fs = require('fs'), 
    path = require('path'), 
    XmlStream = require('xml-stream'), 
    MongoClient = require('mongodb').MongoClient, 
    assert = require('assert'), 
    ObjectId = require('mongodb').ObjectID, 
    url = 'mongodb://username:[email protected]:27017/mydatabase', 
    amount = 0; 

var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); 
var xml = new XmlStream(stream); 

xml.collect('ns:Statistik'); 
xml.on('endElement: ns:Statistik', function(item) { 
    var insertDocument = function(db, callback) { 
     db.collection('vehicles').insertOne(item, function(err, result) { 
      amount++; 
      if (amount % 1000 == 0) { 
       console.log("Inserted", amount); 
      } 
      callback(); 
     }); 
    }; 

    MongoClient.connect(url, function(err, db) { 
     insertDocument(db, function() { 
      db.close(); 
     }); 
    }); 
}); 

当我打电话xml.on()它基本上返回树/元素,我目前。由于这是JSON的直线,所以我可以将它作为参数提供给我的db.collection().insertOne()函数,并将其按照我的需要插入到数据库中。

所有代码的实际工作,因为它是现在,但经过约3000插入停止(约需10秒)。我怀疑这是因为我打开数据库连接,插入数据,然后每次在XML文件中看到一棵树时都关闭连接,在这种情况下,大约有3000次。

我可以在某种程度上将insertMany()函数合并为100个(或更多)的块,但我不太确定这将如何处理这一切,这些工作都是流式传输和异步处理。

所以我的问题是:如何插入大量XML(以JSON)到我的MongoDB数据库没有它崩溃?

回答

1

你假设.insertMany()比写每一次都好,所以它只是收集"stream"上的数据。

因为执行是“异步”,你通常要避免在栈上太多的主动呼叫,所以通常你调用.insertMany()然后.resume()一旦回调完成之前.pause()"stream"

var fs = require('fs'), 
    path = require('path'), 
    XmlStream = require('xml-stream'), 
    MongoClient = require('mongodb').MongoClient, 
    url = 'mongodb://username:[email protected]:27017/mydatabase', 
    amount = 0; 

MongoClient.connect(url, function(err, db) { 

    var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); 
    var xml = new XmlStream(stream); 

    var docs = []; 
    //xml.collect('ns:Statistik'); 

    // This is your event for the element matches 
    xml.on('endElement: ns:Statistik', function(item) { 
     docs.push(item);   // collect to array for insertMany 
     amount++; 

     if (amount % 1000 === 0) { 
      xml.pause();    // pause the stream events 
      db.collection('vehicles').insertMany(docs, function(err, result) { 
      if (err) throw err; 
      docs = [];    // clear the array 
      xml.resume();   // resume the stream events 
      }); 
     } 
    }); 

    // End stream handler - insert remaining and close connection 
    xml.on("end",function() { 
     if (amount % 1000 !== 0) { 
     db.collection('vehicles').insertMany(docs, function(err, result) { 
      if (err) throw err; 
      db.close(); 
     }); 
     } else { 
     db.close(); 
     } 
    }); 

}); 

甚至现代化不是有点:

const fs = require('fs'), 
     path = require('path'), 
     XmlStream = require('xml-stream'), 
     MongoClient = require('mongodb').MongoClient; 

const uri = 'mongodb://username:[email protected]:27017/mydatabase'; 

(async function() { 

    let amount = 0, 
     docs = [], 
     db; 

    try { 

    db = await MongoClient.connect(uri); 

    const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')), 
      xml = new XmlStream(stream); 

    await Promise((resolve,reject) => { 
     xml.on('endElement: ns:Statistik', async (item) => { 
     docs.push(item); 
     amount++; 

     if (amount % 1000 === 0) { 
      try { 
      xml.pause(); 
      await db.collection('vehicle').insertMany(docs); 
      docs = []; 
      xml.resume(); 
      } catch(e) { 
      reject(e) 
      } 
     } 

     }); 

     xml.on('end',resolve); 

     xml.on('error',reject); 
    }); 

    if (amount % 1000 !== 0) { 
     await db.collection('vehicle').insertMany(docs); 
    } 

    } catch(e) { 
    console.error(e); 
    } finally { 
    db.close(); 
    } 

})(); 

注意,MongoClient连接实际上封装了所有的其他操作。你只需要连接一次,以及其他操作发生在对"stream"事件处理程序。

因此,对于你XMLStream的触发事件处理程序的表达式匹配和数据提取和收集到一个数组。每调用1000个项目.insertMany()就会插入文档,在“异步”调用上“暂停”和“恢复”。

一旦完成“结束”事件的"stream"解雇。这是关闭数据库连接的地方,事件循环将被释放并结束程序。

尽管可以通过允许各种各样的.insertMany()调用一次发生(通常为“合并大小”以避免溢出调用堆栈),从而获得某种程度的“并行性”,但基本上这是该过程的外观最简单的形式就是在等待其他异步I/O完成时简单地暂停。

注意:注释从你的原代码.collect()方法按照follow up question这似乎并不是必需的,实际上是保留真的应该每次使用后丢弃在存储节点写入数据库。

+0

哦,男孩,它看起来像它的作品!我试图基本上按照自己的意愿做出自己的决定,但我无法打开我的头。我的问题是,它给了我非常不一致的结果。如果我插入1000条记录,它实际上只会在数据库中显示300条记录(大约在那)。可能是因为我只是在完成之前随机关闭连接。非常感谢,尼尔! – MortenMoulder

+0

另一个说明:你有任何线索,为什么它开始真正去!大约75000插入后缓慢?当数据库为空时,我们正在谈论1000/sec,但是当我达到75000时,可能是100-200/sec。 – MortenMoulder

+0

@MortenMoulder使用'.insertMany()'可以看到显着的改进,但对于吞吐量取决于有多少数据,这是一个完全不同而且非常广泛的主题。没有具体细节就需要考虑太多的因素,比如什么索引(如果有),可用内存,写入分配和基本硬件。如果您有其他问题,通常最好[提出新问题](https://stackoverflow.com/questions/ask),您可以清楚地表达详细信息。 –