2017-02-17 201 views
13

我们做约50万的元素读取(使用xml-stream)的XML文件并执行把它们插入到MongoDB的是这样的:如何在node.js断开连接期间缓冲MongoDB插入?

xml.on(`endElement: product`, writeDataToDb.bind(this, "product")); 

插入在writeDataToDb(type, obj)看起来是这样的:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { }); 

现在蒙戈时连接断开连接,xml流仍会读取,并且控制台将充满错误消息(无法插入,断开连接,EPIPE断开...)。

docs它说:

当您关闭mongod的过程中,驾驶员停止处理操作,并保持缓冲它们由于bufferMaxEntries是-1默认情况下,这意味着缓存中的所有操作。

这个缓冲区实际上做了什么?

当我们插入数据并关闭蒙戈服务器我们注意到,这个事情得到缓冲,然后我们把蒙戈服务器备份,本地驱动程序成功地重新连接和节点恢复插入数据,但缓存文件(蒙戈beeing离线时)不要再插入。

所以我质疑这个缓冲区及其用法。

目标:

我们正在寻找保持缓冲区插入的最佳方式,直到蒙戈回来(在根据wtimeout 15000milliseconds),并让然后将缓冲的文件或利用xml.pause();xml.resume()这我们尝试没有成功。

基本上,我们需要一点帮助,以便如何处理断开连接而没有数据丢失或中断。

+0

不能复制此,无论是在使用'XML的stream'插入一旦周一缓冲的对象的文档和测试的示例去服务器备份..也许你可以发布更多的代码/提供有关您的设置的更多信息? – cviejo

+0

@cviejo我不能分享我的脚本,因为它们是公司相关的,但是您会介意将您尝试复制的脚本发给我吗? Gist/pastebin就可以了。 – DanFromGermany

回答

1

我不知道Mongodb驱动程序和这个缓冲区条目。也许它只保留特定场景中的数据。

所以我会用更一般的方法来回答这个问题,它可以用于任何数据库。

总之,你有两个问题:

  1. 您还没有从失败的尝试恢复
  2. XML流发送数据太快

要处理的第一个问题,你需要实现一种重试算法,可确保在放弃之前进行多次尝试。

要处理第二个问题,您需要在xml流上实现背压。您可以使用pause方法,resume方法和输入缓冲区来完成此操作。

var Promise = require('bluebird'); 
var fs = require('fs'); 
var Xml = require('xml-stream'); 

var fileStream = fs.createReadStream('myFile.xml'); 
var xml = new Xml(fileStream); 

// simple exponential retry algorithm based on promises 
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) { 
    var delay = initialDelay; 
    var retry = 0; 
    var closure = function() { 
     return task().catch(function(error) { 
      retry++; 
      if (retry > maxRetry) { 
       throw error 
      } 
      var promise = Promise.delay(delay).then(closure); 
      delay = Math.min(delay * 2, maxDelay); 
      return promise; 
     }) 
    }; 
    return closure(); 
} 

var maxPressure = 100; 
var currentPressure = 0; 
var suspended = false; 
var stopped = false; 
var buffer = []; 

// handle back pressure by storing incoming tasks in the buffer 
// pause the xml stream as soon as we have enough tasks to work on 
// resume it when the buffer is empty 
function writeXmlDataWithBackPressure(product) { 
    // closure used to try to start a task 
    var tryStartTask = function() { 
     // if we have enough tasks running, pause the xml stream 
     if (!stopped && !suspended && currentPressure >= maxPressure) { 
      xml.pause(); 
      suspended = true; 
      console.log("stream paused"); 
     } 
     // if we have room to run tasks 
     if (currentPressure < maxPressure) { 
      // if we have a buffered task, start it 
      // if not, resume the xml stream 
      if (buffer.length > 0) { 
       buffer.shift()(); 
      } else if (!stopped) { 
       try { 
        xml.resume(); 
        suspended = false; 
        console.log("stream resumed"); 
       } catch (e) { 
        // the only way to know if you've reached the end of the stream 
        // xml.on('end') can be triggered BEFORE all handlers are called 
        // probably a bug of xml-stream 
        stopped = true; 
        console.log("stream end"); 
       } 
      } 
     } 
    }; 

    // push the task to the buffer 
    buffer.push(function() { 
     currentPressure++; 
     // use exponential retry to ensure we will try this operation 100 times before giving up 
     exponentialRetry(function() { 
      return writeDataToDb(product) 
     }, 100, 2000, 100).finally(function() { 
      currentPressure--; 
      // a task has just finished, let's try to run a new one 
      tryStartTask(); 
     }); 
    }); 

    // we've just buffered a task, let's try to run it 
    tryStartTask(); 
} 

// write the product to database here :) 
function writeDataToDb(product) { 
    // the following code is here to create random delays and random failures (just for testing) 
    var timeToWrite = Math.random() * 100; 
    var failure = Math.random() > 0.5; 
    return Promise.delay(timeToWrite).then(function() { 
     if (failure) { 
      throw new Error(); 
     } 
     return null; 
    }) 
} 

xml.on('endElement: product', writeXmlDataWithBackPressure); 

玩它,把一些console.log了解它的行为。 我希望这会帮助你解决你的问题:)

+0

这基本上是一个很好的实现,但我希望能够利用mongo的内部写入关注/写入缓冲区 - 请查看[本页](https://mongodb.github.io/node-mongodb- native/drivers-articles/anintroductionto1_4_and_2_6.html)和关键字'bufferMaxEntries'。 – DanFromGermany

2

插入500K元素与insertOne()是一个非常糟糕的主意。您应该使用bulk operations,它允许您在单个请求中插入许多文档。 (这里例如10000,因此它可以在50个单请求完成) 为了避免缓冲问题,您可以手动处理:

  1. 禁用缓冲带bufferMaxEntries: 0
  2. 设置重新连接性能:reconnectTries: 30, reconnectInterval: 1000
  3. 创建批量操作并用10000个项目填充
  4. 暂停xml阅读器。尝试插入10000个项目。如果失败,重试每3000ms,直到它成功
  5. 你可能会面临一些重复的ID问题,如果批量操作的执行过程中中断,所以不理会他们(错误代码:11000)

这里是一个示例脚本:

var fs = require('fs') 
var Xml = require('xml-stream') 

var MongoClient = require('mongodb').MongoClient 
var url = 'mongodb://localhost:27017/test' 

MongoClient.connect(url, { 
    reconnectTries: 30, 
    reconnectInterval: 1000, 
    bufferMaxEntries: 0 
}, function (err, db) { 
    if (err != null) { 
    console.log('connect error: ' + err) 
    } else { 
    var collection = db.collection('product') 
    var bulk = collection.initializeUnorderedBulkOp() 
    var totalSize = 500001 
    var size = 0 

    var fileStream = fs.createReadStream('data.xml') 
    var xml = new Xml(fileStream) 
    xml.on('endElement: product', function (product) { 
     bulk.insert(product) 
     size++ 
     // if we have enough product, save them using bulk insert 
     if (size % 10000 == 0) { 
     xml.pause() 
     bulk.execute(function (err, result) { 
      if (err == null) { 
      bulk = collection.initializeUnorderedBulkOp() 
      console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try') 
      xml.resume() 
      } else { 
      console.log('bulk insert failed: ' + err) 
      counter = 0 
      var retryInsert = setInterval(function() { 
       counter++ 
       bulk.execute(function (err, result) { 
       if (err == null) { 
        clearInterval(retryInsert) 
        bulk = collection.initializeUnorderedBulkOp() 
        console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
        xml.resume() 
       } else if (err.code === 11000) { // ignore duplicate ID error 
        clearInterval(retryInsert) 
        bulk = collection.initializeUnorderedBulkOp() 
        console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
        xml.resume() 
       } else { 
        console.log('failed after first try: ' + counter, 'error: ' + err) 
       } 
       }) 
      }, 3000) // retry every 3000ms until success 
      } 
     }) 
     } else if (size === totalSize) { 
     bulk.execute(function (err, result) { 
      if (err == null) { 
      db.close() 
      } else { 
      console.log('bulk insert failed: ' + err) 
      } 
     }) 
     } 
    }) 
    } 
}) 

示例日志输出:

doc 0 : 10000 saved on first try 
doc 10000 : 20000 saved on first try 
doc 20000 : 30000 saved on first try 
[...] 
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown 
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0 
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0 
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0 
doc 130000 : 140000 saved after 4 tries 
doc 140000 : 150000 saved on first try 
[...] 
+0

您的回答没有提供有关mongo写入缓冲区的信息,也没有解决如何在复制副本集或断开连接期间插入所有文档的解决方案。有关批量插入的信息很有趣,我会仔细研究一下,谢谢! – DanFromGermany

+0

@DanFromGermany是的,因为对我来说,它看起来像你试图解决错误的问题:真正的问题是你的应用程序与数据库断开连接。减少对数据库的调用时,自动重新连接会更容易,因此不需要进行写入缓冲 – felix

+0

我的应用程序**不会从数据库中断开连接。我希望编写应用程序**,以便在**断开连接时*或*副本集中的主交换机断言重新连接并写入所有数据。 – DanFromGermany