2016-11-28 151 views
0

我的问题与Rx和Catch运算符有关。比方说,我有我的可观察的超时和每次超时发生我想重新创建底层观察(Catch)并做同样的事情(添加超时和捕获)。无限延伸中的无限捕获

下面我粘贴了示例代码。为了这个例子的目的,Timeout每2秒发生一次。从我的观察来看,这段代码无法无限工作,不知何故,在娱乐之后,某些事物正在引用旧的可观察的剩菜。当Catch被调用时,那些剩余物会累积。

大多数可疑行是最后一行,是某种自引用存在。但我实际上无法想象自己为什么它可能是错的?还有什么办法可以用类似的逻辑来创建可以永久工作的可观察事物吗?

public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return recreateObservable() 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable)); 
    } 
+1

这是一个.NET的问题或Java的问题吗?我很困惑的rx-java标签... –

+0

这是更接近Rx的问题,所以rx-net和rx-java都很好。只有代码示例是在C#中 –

回答

1

我想你只是想用Retry()操作符。

我假设你的初始序列和你的继续序列是一样的。

例如

Observable.Return(1).Concat(Observable.Throw<int>(new Exception())) 
    .Retry() 

这将运行在一个紧密的无限循环中。

您的代码可能最后看起来像个

createObservable() 
    .Timeout(TimeSpan.FromSeconds(2)) 
    .Retry() 
0

你可以做这样的事情。

//done in Linqpad, where async Main is allowed. 
async void Main() 
{ 
    var source = new Subject<string>(); 
    var backup = new Subject<string>(); 
    var reliableStream = source.CreateReliableStream(() => backup); 
    reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"),() => Console.WriteLine("Completed.")); 

    source.OnNext("sourceAbc"); 
    backup.OnNext("backupAbc"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    source.OnNext("sourceDef"); 
    backup.OnNext("backupDef"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    //Doesn't yield "Completed" because it's re-subscribing. 
    source.OnCompleted(); 
    backup.OnCompleted(); 

} 

public static class Ex 
{ 
    public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f) 
    { 
     while(true) 
      yield return f(); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return InfiniteObservables(recreateObservable) 
      .Select(o => o.Timeout(TimeSpan.FromSeconds(2))) 
      .OnErrorResumeNext(); 
    } 
} 

产生以下的输出:

Next: sourceAbc 
Next: sourceDef 
Next: backupHij 
Next: backupLmn 

我不是这种方法的粉丝虽然。 Rx将错误视为流终止符,并且您正在尝试将它们视为替代消息。你最终会像这样往上游去。