2011-02-07 63 views
0

我是新来的RX,我有我想要的场景工作良好,但在我看来,必须有一个更简单或更优雅的方式来实现这一点。我所拥有的是IObservable<T>,我想以这样一种方式订阅它,最后触发一个IObservable<U>,,触发一个异步操作,为每个它所看到的生成一个U.有没有更简单的方法让IObservable异步依赖另一个IObservable?

我有什么至今(即伟大工程,但似乎累赘)使用中间事件流是这样的:

public class Converter { 
    public event EventHandler<UArgs> UDone; 
    public IConnectableObservable<U> ToUs(IObservable<T> ts) { 
    var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay(); 
    ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t)))); 
    return us; 
    } 
    private void OnUDone(U u) { 
    var uDone = UDone; 
    if (uDone != null) { 
     uDone(this, u); 
    } 
    } 
} 

... 

var c = new Converter(); 
IConnectableObservable<T> ts = ...; 
var us = c.ToUs(ts); 
us.Connect(); 

... 

我敢肯定,我错过了一个更简单的方法吗这...

+0

您确定要在这里重播吗? – 2011-02-09 09:31:34

回答

1

SelectMany应该做你需要什么,拉平了IO<IO<T>>

Observable.Range(1, 10) 
     .Select(ii => Observable.Start(() => 
      string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId))) 
     .SelectMany(id=>id) 
     .Subscribe(Console.WriteLine); 
+0

谢谢,斯科特。我对SelectMany了如指掌,但出于某种原因,我没有考虑这个问题,因为它没有考虑到(单项)可观察事物的可观察性。 – lesscode 2011-02-08 01:50:19

0

这正是SelectMany是:

IObservable<int> ts 

IObservable<string> us = ts.SelectMany(t => StartAsync(t)); 

us.Subscribe(u => 
    Console.WriteLine("StartAsync completed with {0}", u)); 

... 

private IObservable<string> StartAsync(int t) 
{ 
    return Observable.Return(t.ToString()) 
     .Delay(TimeSpan.FromSeconds(1)); 
} 

请记住,如果StartAsync具有可变完成时间,您可以在从输入值不同的顺序接收输出值。

相关问题