我很努力将节点流转换为Rxjs观察对象。将节点流转换为Rx.js观察对象
当我尝试使用1个URL时,流式传输本身效果很好。但是,当我尝试通过URLS数组映射相同的函数时,出现错误。
我正在使用Rx.Node将流转换为Observable。
这是我目前正试图
// data_array is an array of 10 urls that I'm scraping data from.
let parentStream = Rx.Observable.from(data_array);
parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function createStream(url){
return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
}
但这是输出×10
RefCountObservable {
source:
ConnectableObservable {
source: AnonymousObservable { source: undefined, __subscribe: [Function] },
_connection: null,
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [],
hasError: false } },
_count: 0,
_connectableSubscription: null }
我首先想到flatMap会(在data_array中的网址数)工作,因为它是在一个可观察的平展observables ....但是当我尝试flatMap,我得到这个:
Complete
Error TypeError: unknown type returned
但是,如果我这样做:
这适用于1个URL,但我不能捕获所有的网址在data_array中的一个流。
let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'))
我觉得我误解的东西,不仅是因为它清除不工作的多个URL,但即使它在第二个例子中的工作....我得到“完成”之前,首先所有数据进来。
显然,我误解了一些东西。任何帮助将是美好的。谢谢。
* UPDATE *
我尝试了不同的路径,它的工作原理,但不使用节点流。节点流将是理想的,所以仍然想要使上述示例工作。
我接下来使用的方法是围绕我的网页抓取功能,这是刮以下承诺。这是有效的,但结果是十个巨大的数组包含每个数组中每个URL的所有数据。我真正想要的是一组对象,我可以在数据对象通过时组成一系列转换。
这里是不同的,但工作方式:
let parentStream = Rx.Observable.from(data_array);
parentStream.map(url => {
return Rx.Observable.defer(() => {
return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]);
})
})
.concatAll()
.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function scrape(url, selector, scope) {
return new Promise(
(resolve, reject) => x(
url,
selector,
scope
)((error, result) => error != null ? reject(error) : resolve(result))
);
}