我的问题与Rx和Catch运算符有关。比方说,我有我的可观察的超时和每次超时发生我想重新创建底层观察(Catch)并做同样的事情(添加超时和捕获)。无限延伸中的无限捕获
下面我粘贴了示例代码。为了这个例子的目的,Timeout每2秒发生一次。从我的观察来看,这段代码无法无限工作,不知何故,在娱乐之后,某些事物正在引用旧的可观察的剩菜。当Catch被调用时,那些剩余物会累积。
大多数可疑行是最后一行,是某种自引用存在。但我实际上无法想象自己为什么它可能是错的?还有什么办法可以用类似的逻辑来创建可以永久工作的可观察事物吗?
public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
{
return targetObservable
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
}
private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
{
GC.Collect(); // For debug - make sure all unreferenced object are removed
return recreateObservable()
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable));
}
这是一个.NET的问题或Java的问题吗?我很困惑的rx-java标签... –
这是更接近Rx的问题,所以rx-net和rx-java都很好。只有代码示例是在C#中 –