2016-09-23 57 views
1

我使用highland.js来处理使用流读取两个分隔符之间的内容的文件。我还使用async.js按顺序运行一系列http请求。使用异步处理高地流数据块

理想情况下,我想将来自高地的输出x作为第一个函数传递给async系列(链),以便为从流中提取的每个块执行HTTP请求。

这可能吗?如果是这样,这怎么能实现呢?

var async = require('async'); 
var _ = require('highland'); 


_(fs.createReadStream(files[0], { encoding: 'utf8' })) 
     .splitBy('-----BEGIN-----\n') 
     .splitBy('\n-----END-----\n') 
     .filter(chunk => chunk !== '') 
     .each(function (x) { 
     }).done(function() { 

    async.series([ 
     function(callback) { 
      setTimeout(function() { 
       console.log('Task 1'); 
       callback(null, 1); 
      }, 300); 
     }, 
     function(callback) { 
      setTimeout(function() { 
       console.log('Task 2'); 
       callback(null, 2); 
      }, 200); 
     }, 
    ], function(error, results) { 
     console.log(results); 
    }); 

});; 
+0

在这种情况下做了什么? – abdulbarik

+0

为什么'x'没有传递给'javascript'处的函数? – guest271314

+0

'done' http://highlandjs.org/#done是我工作代码的宿醉,并告诉流程结束。我可能会删除它。 – user1513388

回答

1

可以摆脱调用eachdone的。过滤后,您可以使用.toArray(callback)进行跟踪。回调传递一个包含高地结果的数组。你可能会重构这样

var Q = require('q'); 
var _ = require('highland'); 


_(fs.createReadStream(files[0], { encoding: 'utf8' })) 
     .splitBy('-----BEGIN-----\n') 
     .splitBy('\n-----END-----\n') 
     .filter(chunk => chunk !== '') 
     .each(asyncTasks); 

function asyncTasks(x) { // here, x will be each of the results from highland 
    async.series([ 
     // do something with x results 
     function(callback) { 
      console.log('Task 1'); 
      callback(null, 1); 
     }, 
     // do something else with x results 
     function(callback) { 
      console.log('Task 2'); 
      callback(null, 2); 
     }, 
    ], function(error, results) { 
     console.log(results); 
    }); 
} 

here是为toArray文档的链接。 toArray消耗流,正如done也。如果您有任何问题,请告诉我。

虽然老实说,我认为你会更好地使用promise来代替。尽管它的一部分只是个人喜好,部分原因是它使代码更具可读性。从what I've read,异步比承诺更高性能,但承诺的好处是你可以将结果从一个函数传递到下一个函数。因此,在您的示例中,您可以在第一部分中对x做一些处理,然后将修改后的结果传递给下一个函数,以及下一个函数等等。当您使用async.series时,您可以致电callback(null, result)完成每项功能,并且只有在系列最后完成时,才能获得结果,当您从callback的所有呼叫中获得所有结果时。现在,您可以随时将结果保存到async.series之外的某个变量,但这会使代码更加混乱。如果你想用承诺重写它,它看起来如下。我在这里使用q,但它只是您可以使用的许多承诺库中的一个。

var async = require('async'); 
    var _ = require('highland'); 


    _(fs.createReadStream(files[0], { encoding: 'utf8' })) 
      .splitBy('-----BEGIN-----\n') 
      .splitBy('\n-----END-----\n') 
      .filter(chunk => chunk !== '') 
      .each(asyncTasks); 

    function asyncTasks(x) { // here, x will be an array of the results from highland 
     return asyncTask1(x) 
       .then(asyncTask2) 
       .then(asyncTask3) 
    } 

    function asyncTask1(x) { 
     var deferred = Q.defer(); 

     // do some stuff 

     if (// some error condition) { 
     deferred.reject(); 
     } else { 
     deferred.resolve(x); // or pass along some modified version of x 
     } 

     return deferred.promise; 
    } 

    function asyncTask2(x) { 
     // same structure as above 
    } 

    function asyncTask3(x) { 
     // same structure as above 
    } 

这些天的一些异步API已经开始返回承诺,除了接受回调或有时代替。所以这是一件好事情,以获得舒适。承诺是非常有用的。你可以阅读更多关于他们herehere

+0

谢谢,我正在尝试这个。 Q为什么更好,它看起来如何使用它。如果它更好,我会很乐意切换, – user1513388

+0

x看起来像是所有块的数组,与仅需要传递给异步链的块相对。 – user1513388

+0

哦,好的。对不起,我误解了你的需求。我更新了代码。只是轻微的变化。现在你可以调用它们,然后像​​以前一样传递相同的函数。现在,将会从高地调用每个结果的asyncTasks。每个消耗流,所以你不需要调用'done()以及'。 –