2016-09-29 43 views
1

我很努力将节点流转换为Rxjs观察对象。将节点流转换为Rx.js观察对象

当我尝试使用1个URL时,流式传输本身效果很好。但是,当我尝试通过URLS数组映射相同的函数时,出现错误。

我正在使用Rx.Node将流转换为Observable。

这是我目前正试图

// data_array is an array of 10 urls that I'm scraping data from. 
let parentStream = Rx.Observable.from(data_array); 

parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createStream(url){ 
    return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 
} 

但这是输出×10

RefCountObservable { 
source: 
    ConnectableObservable { 
    source: AnonymousObservable { source: undefined, __subscribe: [Function] }, 
_connection: null, 
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] }, 
_subject: 
    Subject { 
    isDisposed: false, 
    isStopped: false, 
    observers: [], 
    hasError: false } }, 
_count: 0, 
_connectableSubscription: null } 

我首先想到flatMap会(在data_array中的网址数)工作,因为它是在一个可观察的平展observables ....但是当我尝试flatMap,我得到这个:

Complete 
Error TypeError: unknown type returned 

但是,如果我这样做:

这适用于1个URL,但我不能捕获所有的网址在data_array中的一个流。

let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 

stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')) 

我觉得我误解的东西,不仅是因为它清除不工作的多个URL,但即使它在第二个例子中的工作....我得到“完成”之前,首先所有数据进来。

显然,我误解了一些东西。任何帮助将是美好的。谢谢。

* UPDATE *

我尝试了不同的路径,它的工作原理,但不使用节点流。节点流将是理想的,所以仍然想要使上述示例工作。

我接下来使用的方法是围绕我的网页抓取功能,这是以下承诺。这是有效的,但结果是十个巨大的数组包含每个数组中每个URL的所有数据。我真正想要的是一组对象,我可以在数据对象通过时组成一系列转换。

这里是不同的,但工作方式:

let parentStream = Rx.Observable.from(data_array); 

parentStream.map(url => { 
    return Rx.Observable.defer(() => { 
     return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]); 
    }) 
}) 
    .concatAll() 
    .subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function scrape(url, selector, scope) { 
    return new Promise(
     (resolve, reject) => x(
      url, 
      selector, 
      scope 
     )((error, result) => error != null ? reject(error) : resolve(result)) 
    ); 
} 

回答

1

*解决方案* 我想通了。我已附上下面的解决方案:

取而代之的是使用RxNode,我选择使用Rx.Observable.fromEvent()。

节点流发出事件,无论是新数据,错误还是完成。

所以from事件静态运算符正在侦听'data'事件并为每个事件创建一个新的Observable。

然后我合并所有这些,并订阅。代码如下:

let parentStream = Rx.Observable.from(data_array); 
parentStream.map((url)=> { return createEventStream(url); }).mergeAll().subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createEventStream(url){ 
    return Rx.Observable.fromEvent(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')), 'data'); 
}