2015-04-03 81 views
2

我想调用observable中每个项目的异步函数。如回答here,解决方案是使用SelectMany。但是,如果异步方法抛出,订阅将终止。我有以下解决方案,这似乎工作:Rx:使用异步函数进行订阅并忽略错误

obs.SelectMany(x => Observable 
    .FromAsync(() => RunAsync()) 
    .Catch(Observable.Empty<string>())); 

是否有更多的习惯解决方案?

回答

1

有一种标准的方式,以便能够观察到发生在您的通话RunAsync例外,而这使用.Materialize()

.Materialize()方法变成一个IObservable<T>序列为IObservable<Notification<T>>序列在那里你可以推理对OnNextOnErrorOnCompleted电话。

我写此查询:

var obs = Observable.Range(0, 10); 

obs 
    .SelectMany(x => 
     Observable 
      .FromAsync(() => RunAsync()) 
      .Materialize()) 
    .Where(x => x.Kind != NotificationKind.OnCompleted) 
    .Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!")) 
    .Subscribe(x => x.Dump()); 

有了这个支持代码:

private int counter = 0; 
private Random rnd = new Random(); 

private System.Threading.Tasks.Task<string> RunAsync() 
{ 
    return System.Threading.Tasks.Task.Factory.StartNew(() => 
    { 
     System.Threading.Interlocked.Increment(ref counter); 
     if (rnd.NextDouble() < 0.3) 
     { 
      throw new Exception(counter.ToString()); 
     } 
     return counter.ToString(); 
    }); 
} 

当我运行它,我得到这样的输出:

2 
4 
5 
1! 
6 
7 
3! 
10 
8! 
9 

每条线结束于!是致电RunAsync,导致一个例外。

0

您也可以使用OnErrorResumeNext。

obs.SelectMany(x => Observable 
    .FromAsync(() => RunAsync()) 
    .OnErrorResumeNext(Observable.Empty<string>())); 
+0

没错,但Catch让我有机会记录异常。 – 2015-04-03 10:22:14

+0

这是真的,但也可以用Do()来实现。 无论如何,catch可能是更好的选择,因为OnErrorResumeNext在成功终止的情况下也会执行空序列。 – treze 2015-04-03 10:47:49