对于基于Rx的变化跟踪解决方案,我需要一个运算符,它可以以可观察序列获取第一个和最近一个项目。Rx:用于从Observable流获取第一个和最近值的运算符
我怎么会写会产生以下大理石图的Rx操作(注:括号只是用来阵容的项目......我不知道如何最好地在文本代表此):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
对于基于Rx的变化跟踪解决方案,我需要一个运算符,它可以以可观察序列获取第一个和最近一个项目。Rx:用于从Observable流获取第一个和最近值的运算符
我怎么会写会产生以下大理石图的Rx操作(注:括号只是用来阵容的项目......我不知道如何最好地在文本代表此):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
使用相同的命名为@Wilka你可以用下面的扩展,是有点不言自明:
public static IObservable<TResult> FirstAndLatest<T, TResult>(this IObservable<T> source, Func<T,T,TResult> func)
{
var published = source.Publish().RefCount();
var first = published.Take(1);
return first.CombineLatest(published, func);
}
注意,它不一定返回Tuple
,而是为您提供在结果上传递选择器函数的选项。这使它与基本的主要操作(CombineLatest
)保持一致。这显然很容易改变。
用法(如果你想的元组产生的数据流中):
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest((a,b) => Tuple.Create(a,b))
.Subscribe(Console.WriteLine);
我怀疑有这样做的更好的方法(和我不喜欢使用这样做),但你可以创建一个这样
public static IObservable<Tuple<T, T>> FirstAndLatest2<T>(this IObservable<T> source)
{
return Observable.Defer(() => {
bool hasFirst = false;
T first = default(T);
return source
.Do(item =>
{
if (!hasFirst)
{
hasFirst = true;
first = item;
}
})
.Select(current => Tuple.Create(first, current));
});
}
操作,那么你会用它LIK E本:
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest()
.Subscribe(Console.WriteLine);
FirstAndLatest错误。由于缺乏懒惰,hasFirst和First状态在所有订阅中共享。所有的自定义操作符应该首先调用Observable.Create,除非它纯粹是一个现有操作符组合的“宏”。或者,您也可以在这里使用Observable.Defer以创建每个订阅状态。 – 2012-07-28 22:12:19
谢谢,我错过了。我已更新我的答案以使用Observable.Defer – Wilka 2012-07-30 08:51:43
试试这个:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source
.Take(1)
.Repeat()
.Zip(source, (x0, xn) => Tuple.Create(x0, xn));
}
简单,是吧?
或者,以共享底层源替代,试试这个:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source.Publish(
s =>
s.Take(1)
.Repeat()
.Zip(s, (x0, xn) => Tuple.Create(x0, xn)));
}
哎呦!抓住这个。它不起作用。它基本上不断产生一对最新值。这样发布不起作用。原始实施是最好的。
您应该添加一个发布操作符来共享订阅源的副作用。上面的FirstAndLatest实现将导致两个订阅源为其每个订阅的结果,这可能会导致大量重复计算(或更糟糕的副作用,如启动I/O和什么)。 – 2012-07-28 22:14:07
@BartDeSmet - 我故意用这种方式实现它。如果源观察值很热,那么我们是否不希望为每个订阅进行计算?不过,我会做出一个替代实施来避免这些问题。 – Enigmativity 2012-07-29 00:48:39
@Enigmativity仍然认为'Take(1).CombineLatest'比Take(1).Repeat()。Zip'更合适。 – yamen 2012-07-30 20:39:27
您应该添加一个发布操作符来共享订阅源的副作用。上面的FirstAndLatest实现将导致两个订阅源为其每个订阅的结果,这可能会导致大量重复计算(或更糟糕的副作用,如启动I/O和什么)。 – 2012-07-28 22:13:35
我接受这个答案,因为这是第一个正确答案,尽管Bart的评论让我想知道如何将Publish()整合到实现中。这不仅仅是在最后加上Publish()的问题。 – Damian 2012-07-29 00:01:37
根据@BartDeSmet我添加了'Publish'(还添加了'RefCount',不知道这是否是首选方式,或者调用'Connect')。 – yamen 2012-07-29 21:20:47