0
我必须同步订阅类似地实现对这段代码可观察到的数据序列:反应扩展:转换同步订阅异步
integers.Where(id => (id & 1) == 0).Subscribe(id => evenHandler.LRP(id, RandomDelay()));
integers.Where(id => (id & 1) == 1).Subscribe(id => oddHandler.LRP(id, RandomDelay()));
的问题是操作阻塞,所以你有一个结束情况是这样的:
OddHandler running with ID 1 for 331ms...
EvenHandler running with ID 2 for 651ms...
OddHandler running with ID 3 for 391ms...
EvenHandler running with ID 4 for 633ms...
OddHandler running with ID 5 for 197ms...
我想操作异步运行,我没有访问IHandler
接口暴露的方法LRP
。之后做一些研究,它似乎至少有两种方法可以做到这一点使用RX:
方法1:
integers.Where(id => (id & 1) == 0)
.Select(async id => await Task.Run(() => evenHandler.LRP(id, RandomDelay())))
.Subscribe();
integers.Where(id => (id & 1) == 1)
.Select(async id => await Task.Run(() => oddHandler.LRP(id, RandomDelay())))
.Subscribe();
方法2:
integers.Where(id => (id & 1) == 0)
.SelectMany(id => Observable.StartAsync(
async() => await Task.Run(() => evenHandler.LRP(id, RandomDelay())))
).Subscribe();
integers.Where(id => (id & 1) == 1)
.SelectMany(id => Observable.StartAsync(
async() => await Task.Run(() => oddHandler.LRP(id, RandomDelay())))
).Subscribe();
出现产生两种方法相同的结果。
我的问题:
- 哪两种技术之间的差异?
- 完成我需要做什么的建议方法是什么?