2014-12-05 52 views
4

我有一个带有实时数据的流,以及基本上分隔属于一起的实时数据部分的流。现在当有人订阅实时数据流时,我想重播他们的实时数据。但是我不想记住所有的实时数据,只有自上一次其他流发出值以来的部分。通过可观察限制重播缓冲区

There is an issue这将解决我的问题,因为有一个重播操作符正是我想要的(或至少我认为)。

目前如何轻松完成此操作?有没有比以下更好的方法?

private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem> 
{ 
    private readonly List<TItem> cached = new List<TItem>(); 
    private readonly IObservable<TDelimiter> delimitersObservable; 
    private readonly IObservable<TItem> itemsObservable; 
    public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable) 
    { 
     this.itemsObservable = itemsObservable; 
     this.delimitersObservable = delimitersObservable; 
    } 

    public IDisposable Subscribe(IObserver<TItem> observer) 
    { 
     lock (cached) 
     { 
      cached.ForEach(observer.OnNext); 
     } 

     return itemsObservable.Subscribe(observer); 
    } 

    public IDisposable Connect() 
    { 
     var delimiters = delimitersObservable.Subscribe(
      p => 
       { 
        lock (cached) 
        { 
         cached.Clear(); 
        } 
       }); 
     var items = itemsObservable.Subscribe(
      p => 
       { 
        lock (cached) 
        { 
         cached.Add(p); 
        } 
       }); 
     return Disposable.Create(
      () => 
       { 
        items.Dispose(); 
        delimiters.Dispose(); 
        lock (cached) 
        { 
         cached.Clear(); 
        } 
      }); 
} 

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters) 
{ 
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters); 
} 
+0

只是一个想法的优势......不会'ConcurrentBag '是更好的选择比使用''上缓存lock' '?我的意思是,这就是它的设计目的...... – toadflakz 2014-12-05 09:25:02

+1

@toadflakz - AFAIK,ConcurrentBag不保证保留添加顺序(如果项目可观察行为正确,我使用列表以正确顺序获取项目)。 ConcurrentQueue可以解决这个问题,但清除列表比清除ConcurrentQueue更容易。 – 2014-12-05 09:29:22

+0

感谢您的解释 - 我对实时数据开发感兴趣,因此有经验的人员对代码设计的决策见解表示赞赏。 – toadflakz 2014-12-05 09:34:15

回答

4

这是做你想做的吗?它使所有的锁定和竞争条件到Rx利弊:)

private class ReplayWithLimitObservable<T, TDelimiter> : IConnectableObservable<T> 
{ 
    private IConnectableObservable<IObservable<T>> _source; 

    public ReplayWithLimitObservable(IObservable<T> source, IObservable<TDelimiter> delimiter) 
    { 
    _source = source 
     .Window(delimiter) // new replay window on delimiter 
     .Select<IObservable<T>,IObservable<T>>(window => 
     { 
     var replayWindow = window.Replay(); 

     // immediately connect and start memorizing values 
     replayWindow.Connect(); 

     return replayWindow; 
     }) 
     .Replay(1); // remember the latest window 
    } 

    IDisposable Connect() 
    { 
    return _source.Connect(); 
    } 

    IDisposable Subscribe(IObserver<T> observer) 
    { 
    return _source 
     .Concat() 
     .Subscribe(observer); 
    } 
} 

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters) 
{ 
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters); 
} 
+0

这样比较好,但我仍然不喜欢'IConnectableObservable '必须实施的事实。这不应该是真的,你不觉得吗?你在这里很接近避免它,但仍然是迄今为止...... ;-) – 2014-12-06 00:44:45

+0

至少,[this](https://github.com/Reactive-Extensions/Rx.NET/issues/54) ,但“ReplaySubject ”的“无功/被动”版本将是最好的。它可以在一行代码中解决这个问题:'return xs.Multicast(new ReplaySubject (ys));' – 2014-12-06 00:47:36

+0

@DaveSexton - 完全同意,我对这个问题的反应是'IConnectableObservable'的工厂确实是对Rx的欢迎补充。 – 2014-12-06 09:21:45