2014-09-01 55 views
1

我试图用mongoose将大数据集插入到mongodb中。但在此之前,我需要确保我的for循环正常工作。节点使用猫鼬插入大数据

// basic schema settings 
var mongoose = require('mongoose'); 
var Schema = mongoose.Schema; 
var TempcolSchema = new Schema({ 
    cid: { 
     type: Number, 
     required: true 
    }, 
    loc:[] 
}); 
TempcolSchema.index({ 
    'loc': "sphere2d" 
}); 


// we can easily see from the output that the forloop runs correctly 
mongoose.connect('mongodb://localhost/mean-dev', function(err){ 
    for (var i = 0; i < 10000000; i++) { 
     var c = i; 
     console.log(c); 
    } 
}); 

输出为1,2,3,4,......等

现在我想添加一个猫鼬保存语句转换成for循环。

mongoose.connect('mongodb://localhost/mean-dev', function(err){ 
    var Tempcol = mongoose.model('Tempcol', TempcolSchema); 

    for (var i = 0; i < 10000000; i++) { 
     var c = i; 
     console.log(c); 
     var lon = parseInt(c/100000); 
     var lat = c%100000; 
     new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){}); 
    } 
}); 

输出仍然是1,2,3,4,......但是for循环一段时间后停止,并说是达到最大堆栈和有某种记忆的问题。另外,当我检查集合时,我意识到根本没有插入任何数据点。

因此,有谁知道可能发生了什么?谢谢。

回答

2

这里的问题是,您正在运行的循环并未等待每个操作完成。所以实际上你只是排队等待1000年的.save()请求并试图同时运行它们。你不能在合理的限制内做到这一点,因此你会得到错误回应。

async模块在处理该迭代器的回调时有多种迭代方法,其中最简单的直接方法是whilst。猫鼬也为你处理连接管理,而不需要回调中嵌入,因为模型是连接识别:

var tempColSchema = new Schema({ 
    cid: { 
     type: Number, 
     required: true 
    }, 
    loc:[] 
}); 

var TempCol = mongoose.model("TempCol", tempColSchema); 

mongoose.connect('mongodb://localhost/mean-dev'); 

var i = 0; 
async.whilst(
    function() { return i < 10000000; }, 
    function(callback) { 
     i++; 
     var c = i; 
     console.log(c); 
     var lon = parseInt(c/100000); 
     var lat = c%100000; 
     new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){ 
      callback(err); 
     });    
    }, 
    function(err) { 
     // When the loop is complete or on error 
    } 
); 

不是最伟大的方式做到这一点,但它仍然是一个写在同一时间,你可以使用其他方法来“治理”并发操作,但这至少不会炸毁调用堆栈。

形成MongoDB 2.6及更高版本,您可以使用Bulk Operations API来一次处理多个写入服务器。所以这个过程是相似的,但这个时候,你可以在一个单一的写入和响应,这是更快一次发送1000到服务器:

var tempColSchema = new Schema({ 
    cid: { 
     type: Number, 
     required: true 
    }, 
    loc:[] 
}); 

var TempCol = mongoose.model("TempCol", tempColSchema); 

mongoose.connect('mongodb://localhost/mean-dev'); 

mongoose.on("open",function(err,conn) { 

    var i = 0; 
    var bulk = TempCol.collection.initializeOrderedBulkOp(); 

    async.whilst(
     function() { return i < 10000000; }, 
     function(callback) { 
     i++; 
     var c = i; 
     console.log(c); 
     var lon = parseInt(c/100000); 
     var lat = c%100000; 

     bulk.insert({ "cid": Math.random(), "loc": [ lon, lat ] }); 

     if (i % 1000 == 0) { 
      bulk.execute(function(err,result) { 
       bulk = TempCol.collection.initializeOrderedBulkOp(); 
       callback(err); 
      }); 
     } else { 
      process.nextTick(callback); 
     } 
     }, 
     function(err) { 
     // When the loop is complete or on error 

     // If you had a number not plainly divisible by 1000 
     if (i % 1000 != 0) 
      bulk.execute(function(err,result) { 
       // possibly check for errors here 
      }); 
     } 
    ); 

}); 

,实际上是利用其尚未在本地驱动程序的方法暴露在猫鼬身上,所以需要额外注意确保连接可用。这只是一个例子,但不是唯一的方法,但重点是猫鼬的“神奇”连接并不是建立在这里,所以你应该确定它已经建立。

你有一个要处理的项目的数量,但不是你应该在最后一个块中调用bulk.execute()以及显示的情况,但它取决于响应模数的数量。

重点是不将堆栈操作增加到不合理的大小,并保持处理的有限性。这里的流程控制允许在进行下一次迭代之前需要一些时间才能真正完成的操作。所以无论是批量更新还是一些额外的并行排队是您想要的最佳性能。

如果您不希望写入错误是致命的,而是以不同的方式处理这些错误,那么也有.initializeUnorderedBulkOp()表单。大多数情况下,请参阅Bulk API的官方文档以及如何解释所给回复的响应。

+0

非常感谢。我会稍后尝试代码! – 2014-09-04 16:41:40

+0

我试过你的方法,它按预期工作。但我还有一个问题...而不是回调(错误)我把setTimeout(回调(错误),10000),并希望延迟所有回调10秒。它不像我想象的那样工作。我的推理有什么问题吗?谢谢。 – 2014-09-04 16:59:19

+0

@Chandler这里的关键是你不能使用标准的'for'循环,并且需要循环来要求回调,因为这里每种情况都有。我不确定根据您的评论,您在代码中试图设置超时或为了什么目的。事实上,如果你有另一个问题,最好再提出一个问题。这个答案告诉你有关回调控制循环和插入的最佳方法。 – 2014-09-04 22:12:27