2012-08-14 33 views
1

如何将MainEngine可观察转换为冷?从这个例子:热/冷可观察,乘以用户

public IObservable<int> MainEngine 
    { 
     get 
     { 
      Random rnd = new Random(); 
      int maxValue = rnd.Next(20); 
      System.Diagnostics.Trace.TraceInformation("Max value is: " + maxValue.ToString()); 

      return (from sinlgeInt in Enumerable.Range(0, maxValue) 
        select sinlgeInt).ToObservable(); 
     } 
    } 

    public void Main() 
    { 
     // 1 
     MainEngine.Subscribe(
       onNext: (item) => { System.Diagnostics.Trace.TraceInformation("Value is: " + item.ToString()); } 
     ); 

     // 2 
     MainEngine.Subscribe(
       onNext: (item) => { System.Diagnostics.Trace.TraceInformation("Gonna put it into XML: " + item.ToString()); } 
     ); 
    } 

问题1:在订户1和订户2我得到不同的结果,但我希望他们都收到相同的结果。

问题2:从我添加第二个订阅者的时间点开始,他们两个都继续收到相同的结果。

+0

因为这是安排在同一个线程上,所以没有区别。 (1)将在(2)开始之前完成。 – Asti 2012-08-14 17:32:05

+0

小的一点,它可能只发生,因为你从一个更复杂的现实世界的例子简化,但 “从Enumerable.Range(0,maxValue)sinlgeInt选择sinlgeInt” 可以替换为 “Enumerable.Range(0 ,maxValue)“ – 2013-02-06 15:47:32

回答

1

关于你的第一个问题,问题是观察者没有订阅相同的IObservable,因为你调用了两次getter。

分配IObservable一个局部变量似乎解决该问题:

IObservable<int> mainEngine = MainEngine; 

mainEngine.Subscribe(onNext: (item) => { /* ... */ }); 
mainEngine.Subscribe(onNext: (item) => { /* ... */ }); 

关于你提到的第二个问题,如果你想和大家分享一个subcription到一个IObservable,您可以使用Publish方法:

IConnectableObservable<int> published = MainEngine.Publish(); 

published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 1"); }); 
published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 2"); }); 

published.Connect(); 

两个用户随后会看到以交错的方式从IObservable结果:

0 on observer 1 
0 on observer 2 
1 on observer 1 
1 on observer 2 
etc. 

您也可以在订阅电话后订阅新的观察者,之后所有订阅者都将看到相同的事件。您可以修改您的例子来测试,通过运行您观察到的一个新的线程,并引入延迟:

public static void Main() 
{ 
    Random rnd = new Random(); 
    int maxValue = rnd.Next(20); 

    /* Zip with Observable.Interval to introduce a delay */ 
    IObservable<int> mainEngine = Observable.Range(0, maxValue, Scheduler.NewThread) 
     .Zip(Observable.Interval(TimeSpan.FromMilliseconds(100)), (a, b) => a); 

    /* Publish the observable to share a subscription between observers */ 
    IConnectableObservable<int> published = mainEngine.Publish(); 

    /* Subscribe the first observer immediately, events are not yet being observed */ 
    published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 1"); }); 

    /* Start pushing events to the first observer */ 
    published.Connect(); 

    /* Wait one second and then subscribe the second observer */ 
    Thread.Sleep(1000); 
    published.Subscribe(onNext: (item) => { Console.WriteLine(item + " on observer 2"); }); 

    Console.ReadKey(); 
} 

你会看到事件的二分之一的价值只在第一次观测,然后两个观察者会看到每个事件。

3

您的可观察值已经是cold。如果每次订阅它时获取observable的实例,都会得到相同的值。

,它看起来是是,如果你多次调用MainEngine你回来不同观察的情况下,唯一的办法。但是,这并不能使他们真的

实际上,您已创建了一个冷观察工厂

要使MainEngine方法真正热,你需要添加一个Defer调用,就像这样:

public IObservable<int> MainEngine 
{ 
    get 
    { 
     return Observable.Defer(() => 
     { 
      Random rnd = new Random(); 
      int maxValue = rnd.Next(20); 

      System.Diagnostics.Trace.TraceInformation(
       "Max value is: " + maxValue.ToString()); 

      return Observable.Range(0, maxValue); 
     }); 
    } 
} 

另外请注意,我改变Enumerable.RangeObservable.Range并取消呼叫.ToObservable()

为了现在实际上刁难这里做什么:

var hotObservable = MainEngine.Publish().RefCount(); 

这实际上意味着,当你在同一时间超过一个可观察的认购,他们将分享潜在的可观察的。当没有订阅时,底层的observable将消失,只有在新观察者订阅时才会创建。

请记住,您将MainEngine的实现默认为使用Scheduler.Immediate运行,因此,只有在将observable更改为在其他线程上运行之后,才能看到此代码的优点。

我希望这会有所帮助。