2013-02-10 57 views
0

我很难理解主题对象。无功扩展,主题<T>

考虑下面的代码:

 var sub = new Subject<int>(); 
     sub.Subscribe(x => Console.WriteLine(x)); //subscriber #1   
     sub.Subscribe(x => Console.WriteLine(x)); //subscriber #2   
     sub.OnNext(2); 

我创建int类型的题目,当我执行OnNext它调用其它付费用户(#1和#2)。 我没有得到的东西是我读的那个主体意味着一个对象,它既是可观察的,又是观察者,但是这是如何解释为什么当我调用OnNext时,其他用户被调用。

我会理解,如果主题的OnNext会将其传播给所有订阅者=发布给所有其他人(这是有道理的),但是当我检查源代码时,我看不到任何内容,请参阅下文。

有人可能从下面的代码了解究竟是什么使OnNext(2)传播到其他订阅? (#1,#2)?

公共密封类主题:ISubject,ISubject,IObserver,的IObservable,IDisposable的 {// 领域 私人挥发性IObserver _observer;

// Methods 
public Subject() 
{ 
    this._observer = NopObserver<T>.Instance; 
} 

public void Dispose() 
{ 
    this._observer = DisposedObserver<T>.Instance; 
} 

public void OnCompleted() 
{ 
    IObserver<T> comparand = null; 
    IObserver<T> completed = DoneObserver<T>.Completed; 
    do 
    { 
     comparand = this._observer; 
    } 
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, completed, comparand) != comparand)); 
    comparand.OnCompleted(); 
} 

public void OnError(Exception error) 
{ 
    if (error == null) 
    { 
     throw new ArgumentNullException("error"); 
    } 
    IObserver<T> comparand = null; 
    DoneObserver<T> observer3 = new DoneObserver<T> { 
     Exception = error 
    }; 
    DoneObserver<T> observer2 = observer3; 
    do 
    { 
     comparand = this._observer; 
    } 
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer2, comparand) != comparand)); 
    comparand.OnError(error); 
} 

public void OnNext(T value) 
{ 
    this._observer.OnNext(value); 
} 

public IDisposable Subscribe(IObserver<T> observer) 
{ 
    if (observer == null) 
    { 
     throw new ArgumentNullException("observer"); 
    } 
    IObserver<T> comparand = null; 
    IObserver<T> observer3 = null; 
    do 
    { 
     comparand = this._observer; 
     if (comparand == DisposedObserver<T>.Instance) 
     { 
      throw new ObjectDisposedException(""); 
     } 
     if (comparand == DoneObserver<T>.Completed) 
     { 
      observer.OnCompleted(); 
      return Disposable.Empty; 
     } 
     DoneObserver<T> observer4 = comparand as DoneObserver<T>; 
     if (observer4 != null) 
     { 
      observer.OnError(observer4.Exception); 
      return Disposable.Empty; 
     } 
     if (comparand == NopObserver<T>.Instance) 
     { 
      observer3 = observer; 
     } 
     else 
     { 
      Observer<T> observer5 = comparand as Observer<T>; 
      if (observer5 != null) 
      { 
       observer3 = observer5.Add(observer); 
      } 
      else 
      { 
       observer3 = new Observer<T>(new ImmutableList<IObserver<T>>(new IObserver<T>[] { comparand, observer })); 
      } 
     } 
    } 
    while (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer3, comparand) != comparand); 
    return new Subscription<T>((Subject<T>) this, observer); 
} 

private void Unsubscribe(IObserver<T> observer) 
{ 
    IObserver<T> comparand = null; 
    IObserver<T> instance = null; 
Label_0004: 
    comparand = this._observer; 
    if ((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) 
    { 
     Observer<T> observer4 = comparand as Observer<T>; 
     if (observer4 != null) 
     { 
      instance = observer4.Remove(observer); 
     } 
     else 
     { 
      if (comparand != observer) 
      { 
       return; 
      } 
      instance = NopObserver<T>.Instance; 
     } 
     if (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, instance, comparand) != comparand) 
     { 
      goto Label_0004; 
     } 
    } 
} 

// Properties 
public bool HasObservers 
{ 
    get 
    { 
     return (((this._observer != NopObserver<T>.Instance) && !(this._observer is DoneObserver<T>)) && (this._observer != DisposedObserver<T>.Instance)); 
    } 
} 

// Nested Types 
private class Subscription : IDisposable 
{ 
    // Fields 
    private IObserver<T> _observer; 
    private Subject<T> _subject; 

    // Methods 
    public Subscription(Subject<T> subject, IObserver<T> observer) 
    { 
     this._subject = subject; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     IObserver<T> observer = Interlocked.Exchange<IObserver<T>>(ref this._observer, null); 
     if (observer != null) 
     { 
      this._subject.Unsubscribe(observer); 
      this._subject = null; 
     } 
    } 
} 

}

+0

顺便说一句,在复杂这里是[接收1.1/2.0的性能优化]的一部分(http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions- v2-0-β-可供now.aspx)。 (§提高生产者速度)。 – 2014-09-09 20:13:03

回答

0

主题是可观察到的,因为你可以订阅。你用你的例子来做(你没有订阅两个订阅者)。

主题还是一个观察者,因为你可以做到以下几点:

someObservable.Subscribe(subject); 

这样,你的主题将someObservable接收事件,并将其传播给自己的用户。

P.S.在你的代码中,你自己调用OnNext()方法。但是,这正是someObservable当你与你的主题订阅时所要做的。

+0

看到我的答案,它总结了一些。无论如何感谢Thanx。 – Gilad 2013-02-10 17:58:06

+0

对不起,评论错误的地方... – 2014-09-09 20:12:20

1

我知道,但困扰我的是它没有任何意义。我深入研究代码,发现他们内部的观察者实现包含更多的观察者,见下文。

如果你检查OnNext方法,你可以看到他们遍历所有观察者并调用他们的OnNext方法。

现在一切都对我有意义,我理解了逻辑,但看不到它在哪里实现。

internal class Observer<T> : IObserver<T> 
{ 
    private readonly ImmutableList<IObserver<T>> _observers; 

    public Observer(ImmutableList<IObserver<T>> observers) 
    { 
     this._observers = observers; 
    } 

    internal IObserver<T> Add(IObserver<T> observer) 
    { 
     return new Observer<T>(this._observers.Add(observer)); 
    } 

    public void OnCompleted() 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnCompleted(); 
     } 
    } 

    public void OnError(Exception error) 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnError(error); 
     } 
    } 

    public void OnNext(T value) 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnNext(value); 
     } 
    } 

    internal IObserver<T> Remove(IObserver<T> observer) 
    { 
     int index = Array.IndexOf<IObserver<T>>(this._observers.Data, observer); 
     if (index < 0) 
     { 
      return this; 
     } 
     if (this._observers.Data.Length == 2) 
     { 
      return this._observers.Data[1 - index]; 
     } 
     return new Observer<T>(this._observers.Remove(observer)); 
    } 
}