2016-11-10 76 views
0

缓冲多个订户rx.js观察到

var subject = new rx.Subject(); 
    var stream =  rx.Observable.fromEvent(blah, 'event') 
        .filter(blah) 
        .map(blah) 
        .subscribe(subject); 

       return subject; 

然后我通过受该会以不同的方式,并以不同的速度来处理该事件几种不同的处理程序。
所以我在每个处理器是

subject.subscribe(async function (x) { 
     const func = self[x.eventName]; 
     if (func) { 
      await eventHandlerWrapper(self.handlerName, func, x); 
     } 
     }) 

我有两个问题, 一)如果事件进来超快速的处理程序要同步处理它们在给我的方式,正确的顺序它? b)如果不同的处理程序以不同的速度处理事件,他们将等到最慢的处理程序完成后再提供下一个事件?还是他们会按照自己的节奏缓冲和处理?

谢谢你, [R

回答

1

首先,对象的创建可以简化如下:

const subject = rx.Observable.fromEvent(blah, 'event') 
       .filter(blah) 
       .map(blah) 
       .share(); 

份额方法将创建从流的主题。如果您将此主题实例返回给每个订阅者,您将获得相同的行为,并且看起来更好。

a) if the events come in super fast is the handler going to process 
them synchronously and in the right order given the way I have it? 

事件将按照正确的顺序依次通过整个链。意思是,在处理下一个值之前,通过'fromEvent'进入的事件将被贯穿整个链,直到您订阅它的点为止(除非有一个异步操作符:))。 Ben Lesh在角度连接2015解释了这一点:https://www.youtube.com/watch?v=KOOT7BArVHQ(你可以观察整个谈话,但是它大约在17分钟左右,他将阵列与观测值进行比较)。

b) if the different handlers handle the event at different speeds are 
they all going to wait till the slowest handler is through before the  
next event is provided? or will they all sort of buffer and handle at 
they're own pace? 

他们会按照自己的步调处理事件。检查下面的例子:

let interval$ = Rx.Observable.interval(1000).share(); 

interval$.concatMap((val) => { 
    console.log('called'); 
    return Rx.Observable.of(val).delay(3000) 
    }) 
    .subscribe((val) => console.log("slow ", val)); 

interval$.subscribe((val) => console.log("fast ", val)); 

在这里我使用间隔可观察,我转换成一个主题。所以它会每秒发出一个事件。我有一个订阅正在接受一个值,处理这个值(需要2秒),然后接下一个(使用concatMap)。另一个订阅立即处理它们。如果你运行这个代码(jsbin在这里:https://jsbin.com/zekalab/edit?js,console),你会看到他们都按照自己的步调处理事件。

因此,他们不会等待最慢的处理程序,它会被内部缓冲。

如果最慢的处理器比发生事件的频率慢,那么您描述的情况可能会有潜在的危险情况。在那种情况下,你的缓冲区会不断增长,最终你的应用程序会崩溃。这是一个被称为背压的概念。你比事件处理速度快。在这种情况下,您需要在最慢的处理器上使用像'buffer'或'window'这样的运算符来避免这种情况。

+0

真棒。很好的答案。谢谢。我看着缓冲区,它似乎是分组事件。我需要的基本上是一个队列,最后我可以连接到主题,并将缓冲。缓冲工作吗? – Raif

+0

是的,应该这样做 – KwintenP