2011-11-14 29 views
6

我有一个事件,我无法控制哪个数据为我提供数据。 eventArgs看起来像这样:如何根据事件中的条件完成Rx Observable

class MyEventArg { 
    bool IsLastItem {get;} 
    Data DataItem {get;} 
} 

我使用Rx将此事件转换为IObservable。但是如果IsLastItem为true,我想完成observable。

任何优雅的想法?一种方法是通过一个主题管道数据,我有更多的控制来设置OnComplete事件,如果条件发生...

回答

9

如果你想在最后一个元素被列入你只能与最后一个元素连同常规流与TakeWhile合并合并流。 下面是一个简单的控制台应用程序来证明这一点:

var subject = new List<string> 
{        
"test", 
"last" 
}.ToObservable(); 

var my = subject 
      .Where(x => x == "last").Take(1) 
      .Merge(subject.TakeWhile(x => x != "last")); 

my.Subscribe(
    o => Console.WriteLine("On Next: " + o), 
    () => Console.WriteLine("Completed")); 

Console.ReadLine(); 

此打印:

On Next: test 
On Next: last 
Completed 

UPDATE 有是supressed的OnCompleted消息,如果底层的可观测其实并没有完整的错误。我纠正了代码,以确保OnCompleted被称为

如果你想避免冷观测多次订阅基础序列可以重构这样的代码:

var my = subject.Publish(p => p 
      .Where(x => x == "last").Take(1) 
      .Merge(p.TakeWhile(x => x != "last"))); 
+0

不错!花了我几秒钟来看看它是如何完成的。也许更好写:subject.TakeWhile(x => x!=“last”)。合并(subject.Where(x => x ==“last”)。Take(1)); 冷可观察的提示是+。谢谢 – lukebuehler

2

你在找这样的吗?

IObservable<MyEventArg> result = 
    myEventArgObservable.TakeWhile(arg => !arg.IsLastItem); 
+0

哇,那会简单而好看。如果谓词是真的,你知道Observable是否完成? – lukebuehler

+0

是的,它会通知OnCompleted(),如果你不想让它通知OnCompleted你可以简单地使用Where(arg =>!arg.IsLastItem) – Christoph

+1

是的,它好像Observable完成,但现在我有问题我不会收到最后一个项目... – lukebuehler

2
public static IObservable<TSource> TakeWhileInclusive<TSource>(
     this IObservable<TSource> source, Func<TSource, bool> predicate) 
{ 
    return Observable 
     .Create<TSource>(o => source.Subscribe(x => 
                { 
                 o.OnNext(x); 
                 if (!predicate(x)) 
                  o.OnCompleted(); 
                }, 
               o.OnError, 
               o.OnCompleted 
           )); 
}