我有一个问题A cache serving updates and new values as “DistinctLatest” and full cache contents upon subscription,这是由社区处理。有人提出了一个问题,即上述问题中定义的缓存和替换值的实际目标可以用.DistinctLatest
运算符来定义。应该如何去实现Rx中的DistinctLatest(和缓存)操作符?
行!似乎没有太多关于这样的运营商的讨论。在搜索和思考时,我发现ReactiveX: Group and Buffer only last item in each group,这是非常接近的。为了模拟天生原来的问题,我试着写缓存操作者
/// <summary>
/// A cache that keeps distinct elements where the elements are replaced by the latest.
/// </summary>
/// <typeparam name="T">The type of the result</typeparam>
/// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
/// <param name="newElements">The sequence of new elements.</param>
/// <param name="seedElements">The seed elements when the cache is started.</param>
/// <param name="replacementSelector">The replacement selector to choose distinct elements in the cache.</param>
/// <returns>The cache contents upon first call and changes thereafter.</returns>
public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
var s = newElements.StartWith(seedElements).GroupBy(replacementSelector).Select(groupObservable =>
{
var replaySubject = new ReplaySubject<T>(1);
groupObservable.Subscribe(value => replaySubject.OnNext(value));
return replaySubject;
});
return s.SelectMany(i => i);
}
但这样做的测试似乎并不要么做的伎俩。看起来好像是在开始时订阅了初始值和更新(以及新值)。而如果最后订阅了一个,则只记录替换的种子值。
现在,我想知道一般的DistinctLast
操作符,我认为这个,但它不起作用,然后这个“缓存”添加的是种子值和组的扁平化,但这不是测试告诉。我也尝试过一些分组和.TakeLast()
,但没有骰子。
如果有人有指点或思考这个问题,我很高兴,并希望这通常是有益的。
如果你提供了一个失败的单元测试,那么社区可以实现运营商,使其通过。 (我认为这只是GroupBy + Replay(1) –
我会!我现在被绑在一个会议上(我想我会从链接的问题中进行测试) – Veksi