我有一个带有实时数据的流,以及基本上分隔属于一起的实时数据部分的流。现在当有人订阅实时数据流时,我想重播他们的实时数据。但是我不想记住所有的实时数据,只有自上一次其他流发出值以来的部分。通过可观察限制重播缓冲区
There is an issue这将解决我的问题,因为有一个重播操作符正是我想要的(或至少我认为)。
目前如何轻松完成此操作?有没有比以下更好的方法?
private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem>
{
private readonly List<TItem> cached = new List<TItem>();
private readonly IObservable<TDelimiter> delimitersObservable;
private readonly IObservable<TItem> itemsObservable;
public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable)
{
this.itemsObservable = itemsObservable;
this.delimitersObservable = delimitersObservable;
}
public IDisposable Subscribe(IObserver<TItem> observer)
{
lock (cached)
{
cached.ForEach(observer.OnNext);
}
return itemsObservable.Subscribe(observer);
}
public IDisposable Connect()
{
var delimiters = delimitersObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Clear();
}
});
var items = itemsObservable.Subscribe(
p =>
{
lock (cached)
{
cached.Add(p);
}
});
return Disposable.Create(
() =>
{
items.Dispose();
delimiters.Dispose();
lock (cached)
{
cached.Clear();
}
});
}
public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}
只是一个想法的优势......不会'ConcurrentBag'是更好的选择比使用''上缓存lock' '?我的意思是,这就是它的设计目的...... –
toadflakz
2014-12-05 09:25:02
@toadflakz - AFAIK,ConcurrentBag不保证保留添加顺序(如果项目可观察行为正确,我使用列表以正确顺序获取项目)。 ConcurrentQueue可以解决这个问题,但清除列表比清除ConcurrentQueue更容易。 – 2014-12-05 09:29:22
感谢您的解释 - 我对实时数据开发感兴趣,因此有经验的人员对代码设计的决策见解表示赞赏。 – toadflakz 2014-12-05 09:34:15