4

在一个C#控制台应用程序中,使用System.Reactive.Linq,我试图做一个observable,其中每个项目是由另一个observable进行某些处理的字符串结果。 我用字符串和字符创建了一个简单的repro。 警告,这个例子完全是CONTRIVED,重点是嵌套的.Wait()挂起。嵌套的Observable挂在Wait()

class Program 
{ 
    static void Main(string[] args) 
    { 
     string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" }; 
     IObservable<string> files = fileNames.ToObservable(); 
     string[] extensions = files.Select(fn => 
     { 
      var extension = fn.ToObservable() 
      .TakeLast(4) 
      .ToArray() 
      .Wait(); // <<<<<<<<<<<<< HANG HERE 
      return new string(extension); 
     }) 
     .ToArray() 
     .Wait(); 
    } 
} 

同样,这不是我如何找到许多文件名的后缀。 问题是我如何能够生成一个可观察的字符串,其中的字符串是从完整的可观察值计算出来的。

如果我拉出这段代码并单独运行它,它会很好地完成。

 var extension = fn.ToObservable() 
     .TakeLast(4) 
     .ToArray() 
     .Wait(); 

有一些关于异步方法的嵌套的Wait(),我不明白。

如何编码嵌套的异步observables,所以我可以产生一个简单的字符串数组?

感谢

-John

+2

你的方法是**不是**异步:Wait()是一个阻塞操作... 请参阅https://stackoverflow.com/questions/37801699/what-does-the-wait-operator-in-rx -net-do –

+0

你能解释一下为什么你首先使用'Wait()'? –

回答

4

为什么你的代码是阻塞的原因是因为你使用ToObservable()不指定调度。在这种情况下,它将使用CurrentThreadScheduler

因此,files可观察到的问题,它是第一个OnNext() [A](发送"file1.doxc")使用当前线程。它不能继续迭代,直到OnNext()返回。然而,内fn观察到使用ToObservable()Wait()块,直到fn完成 - 这将队列中的第一OnNext()(发送"f")当前线程调度程序,但它永远无法发送,因为现在第一OnNext() [A ]永远不会回来。

两个简单的修正:

要么改变files观察到这样的:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default); 

或者避免使用内Wait()的一个SelectMany(这肯定是更地道的Rx):

string[] extensions = files.SelectMany(fn => 
{ 
    return fn.ToObservable() 
      .TakeLast(4) 
      .ToArray() 
      .Select(x => new string(x)); 
}) 
.ToArray() 
.Wait(); 

// display results etc. 

每种方法都会有完全不同的执行语义 - 第一种方法的运行方式非常像嵌套循环,每个内部可观察值在下一次外部迭代之前完成。由于Wait()的阻止行为被删除,第二个将更加交错。如果您使用我编写的Spy方法,并在ToObservable()调用后附加它,则您会很清楚地看到此行为。

+2

谢谢詹姆斯,很好的回答,点和信息。我没有充分的理由不使用SelectMany,实际上这是我现在最终的解决方案。我尝试添加NewThreadScheduler.Default仅用于测试,当然也是如此。我认为观察对象在OnNext上被有效地锁定了,但我想我会与间谍一起玩,并试图让自己更清楚。再次感谢! – JohnKoz

1

Wait是一个阻塞调用,这并不其中Rx拌匀。我不知道为什么嵌套的失败。

假设一个异步函数,这个工程:

IObservable<string> files = fileNames.ToObservable(); 
string[] extensions = await files.SelectMany(async fn => 
{ 
    var extension = await fn.ToObservable() 
    .TakeLast(4) 
    .ToArray(); 
     return new string(extension); 
}) 
.ToArray(); 
1

詹姆斯已经钉了问题,但我建议你的代码疖做,只是这样做:

string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" }; 
    string[] extensions = 
    (
     from fn in fileNames.ToObservable() 
     from extension in fn.ToObservable().TakeLast(4).ToArray() 
     select new string(extension) 
    ) 
     .ToArray() 
     .Wait(); 

现在,仍然有它.Wait()。理想情况下,你会这样做:

IDisposable subscription = 
    (
     from fn in fileNames.ToObservable() 
     from extension in fn.ToObservable().TakeLast(4).ToArray() 
     select new string(extension) 
    ) 
     .ToArray() 
     .Subscribe(extensions => 
     { 
      /* Do something with the `extensions` */ 
     }); 

你应该避免所有的等待。