我需要构建一个处理大型CSV文件以用于bluebird.map()调用的函数。鉴于文件的可能大小,我想使用流式传输。NodeJS,promise,streams - 处理大型CSV文件
该函数应该接受一个流(一个CSV文件)和一个函数(处理来自流的块),并在文件读取结束(解析)或错误(拒绝)时返回一个承诺。
于是,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相互关联的问题:
- 我需要节制的实际数据量进行处理,从而不会产生内存压力。
- 作为
processor
参数传递的函数通常是异步的,例如通过基于promise的库(现在:pg-promise
)将文件的内容保存到数据库。因此,它会在记忆中创造一个承诺并继续前进。
的pg-promise
图书馆功能来管理这一点,像page(),但我不能换我围绕如何流事件处理程序与这些承诺搭配方式前进。现在,我在readable
部分的每个read()
之后返回承诺,这意味着我创建了大量承诺的数据库操作,并最终出错,因为我遇到了进程内存限制。
有没有人有这样的工作示例,我可以用作跳点?
UPDATE:可能对皮肤猫不止一种方法,但这个工程:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人看到一个潜在的问题,这种做法?
看起来很整齐,如果这个工作,那么很棒的工作!我很高兴最近在'pg-promise'中加入'page'并不是徒劳的;) –
在readDataFromStream的末尾简化了它;)你不需要'return undefined',这就是发生了什么当你什么也没有返回时); –
实际上,这可能是一个问题......当你调用db.task时,哟不会处理它的结果,所以如果它拒绝,将会有一个错误诺言库,你的拒绝没有处理。 –