2015-11-03 69 views
2

RxJava buffer example缓冲流描述了期望的结果完美:如何使用fromWebSocket主题

在突发期间收集在缓冲器项,并在每个脉冲串的端部发射它们,通过使用去抖操作员发出缓冲关闭指示灯,以缓冲算

编辑:在审查How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?,我的问题似乎是使用主题,而不是直接可观测相关

使用套接字流,以产生窗口关闭事件(如下)在2个插槽打开且无任何事件流了出来:

ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver); 
var closer = ws.flatMapFirst(Rx.Observable.timer(250)); 
ws.buffer(closer) 
    .subscribe(function(e) { console.log(e, 'socket messages');}); 
+0

问题是什么? – user3743222

+0

让我们从'如何在bufferClosingSelector中引用源观察本身'开始。我在这个问题上找不到任何例子。 – mygzi

+0

http://stackoverflow.com/questions/33402737/how-to-create-a-rxjs-buffer-that-groups-elements-in-nodejs-but-that-does-not-rel – user3743222

回答

2

总结发现的问题在这里:

  • Rx.DOM.fromWebSocket返回Rx.subject围绕WebSocket的包装。这个主题是由一个观察者和一个观察者组成的(通过new Rx.Subject(observer, observable)。根据我的理解,观察者允许通过其onNext方法向套接字写入,而观察者允许从套接字读取
  • 您总是阅读该主题是很热的来源,但显然这里只是意味着观察者会立即将它的值推送到这里将它推到插座上的主题。在正常情况下(new Rx.Subject()),默认观察者和可观察者是这样的,以使观察者可以听到观察者,因此默认的observable很热,但是在这里,observable是一个冷源,然后任何订阅都会重新执行创建另一个websocket的回调函数,因此创建了两个套接字。,因为创建的(冷)观察值是共享的(通过publish().refCount())。
  • 因此,在这里做同样的事情,可以解决重复问题。这意味着在这种特殊情况下,在您的代码ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();,share中使用publish().refCount()作为别名。
  • 我想知道的Rx.DOM.fromWebSocket该行为是否应该被报告为错误

代码两种方法:

+0

您可以在这里包括实际的解决方法,例如: source = Rx.DOM.fromWebSocket(wsURI,null,wsOpenObserver,wsCloseObserver).share() – mygzi

+0

当然,就是这样。 – user3743222

0

您可以直接传递一个Observablebuffer操作就像RxJava版本:

source.buffer(source.debounce(150)) 

是有效的。见here

使用显示的选择器方法的替代语法将在每次缓冲区关闭时调用该方法,然后订阅它生成的Observable。

此外,RxJava示例中的去抖动正在发出缓冲区运算符的结果,因此不会默认发出累积结果。