2012-07-27 33 views
2

对于基于Rx的变化跟踪解决方案,我需要一个运算符,它可以以可观察序列获取第一个和最近一个项目。Rx:用于从Observable流获取第一个和最近值的运算符

我怎么会写会产生以下大理石图的Rx操作(注:括号只是用来阵容的项目......我不知道如何最好地在文本代表此):

 xs:---[a ]---[b ]-----[c ]-----[d ]---------| 
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------| 

回答

5

使用相同的命名为@Wilka你可以用下面的扩展,是有点不言自明:

public static IObservable<TResult> FirstAndLatest<T, TResult>(this IObservable<T> source, Func<T,T,TResult> func) 
{ 
    var published = source.Publish().RefCount(); 
    var first = published.Take(1);   
    return first.CombineLatest(published, func); 
} 

注意,它不一定返回Tuple,而是为您提供在结果上传递选择器函数的选项。这使它与基本的主要操作(CombineLatest)保持一致。这显然很容易改变。

用法(如果你想的元组产生的数据流中):

Observable.Interval(TimeSpan.FromSeconds(0.1)) 
      .FirstAndLatest((a,b) => Tuple.Create(a,b)) 
      .Subscribe(Console.WriteLine); 
+3

您应该添加一个发布操作符来共享订阅源的副作用。上面的FirstAndLatest实现将导致两个订阅源为其每个订阅的结果,这可能会导致大量重复计算(或更糟糕的副作用,如启动I/O和什么)。 – 2012-07-28 22:13:35

+0

我接受这个答案,因为这是第一个正确答案,尽管Bart的评论让我想知道如何将Publish()整合到实现中。这不仅仅是在最后加上Publish()的问题。 – Damian 2012-07-29 00:01:37

+0

根据@BartDeSmet我添加了'Publish'(还添加了'RefCount',不知道这是否是首选方式,或者调用'Connect')。 – yamen 2012-07-29 21:20:47

1

我怀疑有这样做的更好的方法(和我不喜欢使用这样做),但你可以创建一个这样

public static IObservable<Tuple<T, T>> FirstAndLatest2<T>(this IObservable<T> source) 
{ 
    return Observable.Defer(() => { 
     bool hasFirst = false; 
     T first = default(T); 

     return source 
      .Do(item => 
      { 
       if (!hasFirst) 
       { 
        hasFirst = true; 
        first = item; 
       } 
      }) 
      .Select(current => Tuple.Create(first, current)); 
    }); 
} 

操作,那么你会用它LIK E本:

Observable.Interval(TimeSpan.FromSeconds(0.1)) 
    .FirstAndLatest() 
    .Subscribe(Console.WriteLine); 
+2

FirstAndLatest错误。由于缺乏懒惰,hasFirst和First状态在所有订阅中共享。所有的自定义操作符应该首先调用Observable.Create,除非它纯粹是一个现有操作符组合的“宏”。或者,您也可以在这里使用Observable.Defer以创建每个订阅状态。 – 2012-07-28 22:12:19

+0

谢谢,我错过了。我已更新我的答案以使用Observable.Defer – Wilka 2012-07-30 08:51:43

1

试试这个:

public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
    this IObservable<T> source) 
{ 
    return 
     source 
      .Take(1) 
      .Repeat() 
      .Zip(source, (x0, xn) => Tuple.Create(x0, xn)); 
} 

简单,是吧?


或者,以共享底层源替代,试试这个:

public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
    this IObservable<T> source) 
{ 
    return 
     source.Publish(
      s => 
       s.Take(1) 
       .Repeat() 
       .Zip(s, (x0, xn) => Tuple.Create(x0, xn))); 
} 

哎呦!抓住这个。它不起作用。它基本上不断产生一对最新值。这样发布不起作用。原始实施是最好的。

+1

您应该添加一个发布操作符来共享订阅源的副作用。上面的FirstAndLatest实现将导致两个订阅源为其每个订阅的结果,这可能会导致大量重复计算(或更糟糕的副作用,如启动I/O和什么)。 – 2012-07-28 22:14:07

+0

@BartDeSmet - 我故意用这种方式实现它。如果源观察值很热,那么我们是否不希望为每个订阅进行计算?不过,我会做出一个替代实施来避免这些问题。 – Enigmativity 2012-07-29 00:48:39

+0

@Enigmativity仍然认为'Take(1).CombineLatest'比Take(1).Repeat()。Zip'更合适。 – yamen 2012-07-30 20:39:27

相关问题