2013-05-01 102 views
8

我有一个订阅流的Web组件。如何缓冲流事件?

由于每次显示Web组件时都会重新创建Web组件,因此我必须清理订户并重做它。

现在我将所有用户的列表,并在removed()生命周期的方法我在做:

subscriptions.forEach((sub) => sub.cancel()); 

现在的问题:当不显示Web组件,有没有一个听流。问题在于组件在未显示时缺少数据/事件。

我需要的是缓冲。侦听器注册时,事件需要缓冲并立即发送。 According to the documentation,缓冲发生,直到听众注册:

控制器将缓冲所有传入的事件,直到用户注册。

这工作,但问题是,听者会在某个时刻去除重新登记,看来这不会触发缓冲。

看起来,缓冲只发生在最初,即使所有的听众都不在,以后也不会发生。

所以问题是:如何缓冲这种情况下听众可能会消失又回来?

回答

11

注意:通常您不应该能够重新订阅已关闭的流。这似乎是我们忘记修复的错误。

我不熟悉web组件,但我希望我解决您的问题,并提出以下建议。

一种方法(当然也有很多)会为每个用户创建一个新的流(如html事件),以暂停原始流。

origin是原始流。然后实现一个stream获取器,该获取器返回链接到origin的新流:

未经测试的代码。

Stream origin; 
var _subscription; 
final _listeners = new Set<StreamController>(); 

_addListener(controller) { 
    _listeners.add(controller); 
    if (_subscription == null) { 
    _subscription = origin.listen((event) { 
     // When we emit the event we want listeners to be able to unsubscribe 
     // or add new listeners. In order to avoid ConcurrentModificationErrors 
     // we need to make sure that the _listeners set is not modified while 
     // we are iterating over it with forEach. Here we just create a copy with 
     // toList(). 
     // Alternatively (more efficient) we could also queue subscription 
     // modification requests and do them after the forEach. 
     _listeners.toList().forEach((c) => c.add(event)); 
    }); 
    } 
    _subscription.resume(); // Just in case it was paused. 
} 
_removeListener(controller) { 
    _listeners.remove(controller); 
    if (_listeners.isEmpty) _subscription.pause(); 
} 

Stream get stream { 
    var controller; 
    controller = new StreamController(
     onListen:() => _addListener(controller), 
     onCancel:() => _removeListener(controller)); 
    return controller.stream; 
} 

如果您需要缓冲的事件立即就需要开始订阅马上而不是懒洋洋地在示例代码。

+1

正是我想要的。谢谢!我想我会在'BufferedStreamController'(或其他)实现中包装它,因为我需要在几个地方使用它。 – 2013-05-02 12:17:01

+1

编辑代码:我忘记了同时修改。当一个事件发出时,我们希望听众能够订阅或取消订阅。 – 2013-05-07 07:30:56