2015-10-14 80 views
7

我需要构建一个处理大型CSV文件以用于bluebird.map()调用的函数。鉴于文件的可能大小,我想使用流式传输。NodeJS,promise,streams - 处理大型CSV文件

该函数应该接受一个流(一个CSV文件)和一个函数(处理来自流的块),并在文件读取结束(解析)或错误(拒绝)时返回一个承诺。

于是,我开始:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    // use readable or data event? 
    parser.on('readable', function() { 
    // call processor, which may be async 
    // how do I throttle the amount of promises generated 
    }); 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 

} 

现在,我有两个相互关联的问题:

  1. 我需要节制的实际数据量进行处理,从而不会产生内存压力。
  2. 作为processor参数传递的函数通常是异步的,例如通过基于promise的库(现在:pg-promise)将文件的内容保存到数据库。因此,它会在记忆中创造一个承诺并继续前进。

pg-promise图书馆功能来管理这一点,像page(),但我不能换我围​​绕如何流事件处理程序与这些承诺搭配方式前进。现在,我在readable部分的每个read()之后返回承诺,这意味着我创建了大量承诺的数据库操作,并最终出错,因为我遇到了进程内存限制。

有没有人有这样的工作示例,我可以用作跳点?

UPDATE:可能对皮肤猫不止一种方法,但这个工程:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    // some checks trimmed out for example 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 
    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    var readDataFromStream = function(index, data, delay) { 
    var records = []; 
    var record; 
    do { 
     record = parser.read(); 
     if(record != null) 
     records.push(record); 
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency)) 
    parser.pause(); 

    if(records.length) 
     return records; 
    }; 

    var processData = function(index, data, delay) { 
    console.log('processData(' + index + ') > data: ', data); 
    parser.resume(); 
    }; 

    parser.on('readable', function() { 
    db.task(function(tsk) { 
     this.page(readDataFromStream, processData); 
    }); 
    }); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 
} 

有人看到一个潜在的问题,这种做法?

+0

看起来很整齐,如果这个工作,那么很棒的工作!我很高兴最近在'pg-promise'中加入'page'并不是徒劳的;) –

+0

在readDataFromStream的末尾简化了它;)你不需要'return undefined',这就是发生了什么当你什么也没有返回时); –

+0

实际上,这可能是一个问题......当你调用db.task时,哟不会处理它的结果,所以如果它拒绝,将会有一个错误诺言库,你的拒绝没有处理。 –

回答

3

查找下面的正确执行相同类型的任务的完整应用程序:它将文件读取为流,将其解析为CSV并将每行插入到数据库中。

const fs = require('fs'); 
const promise = require('bluebird'); 
const csv = require('csv-parse'); 
const pgp = require('pg-promise')({promiseLib: promise}); 

const cn = "postgres://postgres:[email protected]:5432/test_db"; 
const rs = fs.createReadStream('primes.csv'); 

const db = pgp(cn); 

function receiver(_, data) { 
    function source(index) { 
     if (index < data.length) { 
      // here we insert just the first column value that contains a prime number; 
      return this.none('insert into primes values($1)', data[index][0]); 
     } 
    } 

    return this.sequence(source); 
} 

db.task(t => { 
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver); 
}) 
    .then(data => { 
     console.log('DATA:', data); 
    } 
    .catch(error => { 
     console.log('ERROR:', error); 
    }); 

注意,我改变的唯一事情:利用图书馆csv-parse代替csv,作为一个更好的选择。

spex库中增加使用方法stream.read,它正确地服务于Readable流以用于承诺。

+0

在查询(“INSERT ...”)完成后,不会尝试从'parser'读取下一个项目,无论如何下一个项目是否已经可读?或者'parser.read()'返回一个promise? – Bergi

+0

另外,OP正在寻找诺言返回的'处理器'回调函数是怎么回事? – Bergi

+0

@Bergi我的理解是,parser.read()是同步的,它显示的方式。如果事实证明不是这样,那么很明显,它需要被包裹在承诺中。 “可读”会被触发一次,而不是每次读取操作,这是我的理解。至于承诺返还处理器,他只是在数据处理完成时寻找一个解决方案,而在失败的情况下则拒绝一个拒绝,我的示例提供了这种解决方案,即任务将相应地解决/拒绝。 –

1

那么说,你不想流式传输,但某种数据块? ;-)

你知道吗https://github.com/substack/stream-handbook

我认为最简单的方法不改变你的架构会是某种承诺池。例如https://github.com/timdp/es6-promise-pool

+0

那么,我曾想过在函数中使用'async.queue',并返回最终完成文件的承诺(或不)。但是,我想知道如何将Bluebird等承诺库与典型的基于流的大型文件处理联系起来。 ('pg-promise'包含'spex',它提供了更高级别的promise函数) – alphadogg

6

你可能想看看promise-streams

var ps = require('promise-streams'); 
passedStream 
    .pipe(csv.parse({trim: true})) 
    .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row))) 
    .wait().then(_ => { 
    console.log("All done!"); 
    }); 

工程与背压和一切。