8

我有点新的Rx.NET。是否有可能捕获任何订户可能抛出的异常?采取以下...捕获异常可以从订阅抛出OnNext行动

handler.FooStream.Subscribe(
      _ => throw new Exception("Bar"), 
      _ => { }); 

目前,我正在赶上每订阅基础与以下的一个实例。它的实现只是使用ManualResetEvent来唤醒一个等待线程。

public interface IExceptionCatcher 
{ 
    Action<T> Exec<T>(Action<T> action); 
} 

和像这样使用它......

handler.FooStream.Subscribe(
      _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred 
      _ => { }); 

我觉得必须有一些更好的办法。 Rx.NET中的所有错误处理功能是否专门用于处理可疑错误?

编辑:每请求,我的实现是https://gist.github.com/1409829(接口和实现在PROD代码分离成不同的组件)。欢迎反馈。这可能看起来很愚蠢,但我使用windsor城堡来管理许多不同的Rx用户。

var exceptionCatcher = 
    new ExceptionCatcher(e => 
           { 
            Logger.FatalException(
             "Exception caught, shutting down.", e); 
            // Deal with unmanaged resources here 
           }, false); 


/* 
* Normally the code below exists in some class managed by an IoC container. 
* 'catcher' would be provided by the container. 
*/ 
observable /* do some filtering, selecting, grouping etc */ 
    .SubscribeWithExceptionCatching(processItems, catcher); 

回答

8

内置的可观察运营商不这样做你是什么:此异常捕手与这样

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher)); 

容器然后,它将是这样用在observable是的IObservable的实例注册默认情况下要求(很像事件),但是你可以做一个扩展方法来做到这一点。

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
           this IObservable<T> source 
           ) where TException : Exception 
{ 
    return Observable.CreateWithDisposable<T>(
     o => source.Subscribe(
      v => { try { o.OnNext(v); } 
        catch (TException) { } 
      }, 
      ex => o.OnError(ex), 
      () => o.OnCompleted() 
      )); 
} 

然后,可以通过此方法包装任何可观察值以获得您描述的行为。

+0

谢谢,你回答我的问题,但你确信周围OnNext你的try/catch将捕获异常?人们可以很容易地用返回的IObservable做些事情,这会导致订阅的代码在另一个线程上执行。我最初尝试在我的Subject.OnNext调用中放置try/catch,但是没有捕获到异常。然而,我可以创建一个SubscribeWithExceptionHandling方法或其他东西。 – drstevens

+1

@drstevens它会从同一个线程捕获异常。如果你的观察者正在启动它自己的异常操作来抛出异常,这将不会捕获这些异常。 –

+1

考虑到有多少Rx操作导致新线程(或池中的任务),我想说这很可能是这种情况。在'handler.FooStream'和'Subscribe'之间是一个'GroupByUntil(...)。SelectMany(...)。Buffer(有时间)'。我实际上最终创建了一个'SubscribeWithCatch',它跟随您的示例捕获异常,然后使用传递给OnError处理程序的相同操作。 – drstevens