2017-04-21 74 views
0

我必须同步订阅类似地实现对这段代码可观察到的数据序列:反应扩展:转换同步订阅异步

integers.Where(id => (id & 1) == 0).Subscribe(id => evenHandler.LRP(id, RandomDelay())); 
integers.Where(id => (id & 1) == 1).Subscribe(id => oddHandler.LRP(id, RandomDelay())); 

的问题是操作阻塞,所以你有一个结束情况是这样的:

OddHandler running with ID 1 for 331ms... 
EvenHandler running with ID 2 for 651ms... 
OddHandler running with ID 3 for 391ms... 
EvenHandler running with ID 4 for 633ms... 
OddHandler running with ID 5 for 197ms... 

我想操作异步运行,我没有访问IHandler接口暴露的方法LRP。之后做一些研究,它似乎至少有两种方法可以做到这一点使用RX:

方法1:

integers.Where(id => (id & 1) == 0) 
    .Select(async id => await Task.Run(() => evenHandler.LRP(id, RandomDelay()))) 
    .Subscribe(); 

integers.Where(id => (id & 1) == 1) 
    .Select(async id => await Task.Run(() => oddHandler.LRP(id, RandomDelay()))) 
    .Subscribe(); 

方法2:

integers.Where(id => (id & 1) == 0) 
    .SelectMany(id => Observable.StartAsync(
     async() => await Task.Run(() => evenHandler.LRP(id, RandomDelay()))) 
    ).Subscribe(); 

integers.Where(id => (id & 1) == 1) 
    .SelectMany(id => Observable.StartAsync(
     async() => await Task.Run(() => oddHandler.LRP(id, RandomDelay()))) 
    ).Subscribe(); 

出现产生两种方法相同的结果。

我的问题:

  1. 哪两种技术之间的差异?
  2. 完成我需要做什么的建议方法是什么?

回答

0

我会远离方法1,因为它有点“打破”Rx。当你在Select里面await时,你正在做什么是异步工作的同步操作符。无论什么时候你想做一些异步的事情,把它放在SelectMany之内。 SelectMany甚至直接接受Task<T>

方法2你正在将Task包装在可观察的内部。我相信这与Observable.FromAsync()非常相似。然而,它有点笨重,创建一个Task,然后将其转换为可观察值;你可以只使用直接可观察到的:

integers.Where(id => (id & 1) == 0) 
    .SelectMany(id => Observable.Start(
     () => evenHandler.LRP(id, RandomDelay()), 
     ThreadPoolScheduler.Default)) 
    .Subscribe(); 

integers.Where(id => (id & 1) == 1) 
    .SelectMany(id => Observable.Start(
     () => oddHandler.LRP(id, RandomDelay()), 
     ThreadPoolScheduler.Default)) 
    .Subscribe(); 

这将创建一个可观察到当内工作完成后,返回一个Unit。我还指定它应该在TaskPoolScheduler上运行,这使它在Task内运行。