2010-02-13 44 views
3

我正在努力与我的第一个简单的“hello world”RX应用程序。我正在使用VS2010 RC,以及最新的RX下载。第一步与反应式扩展蹒跚步骤

以下是简单的控制台应用程序;

class Program 
    { 
     static void Main(string[] args) 
     { 

      var channel = new MessageChannel() 
       .Where(m => m.process) 
       .Subscribe((MyMessage m) => Console.WriteLine(m.subject)); 

      //channel.GenerateMsgs(); 
     } 
    } 

    public class MyMessage 
    { 
     public string subject; 
     public bool process; 
    } 

    public class MessageChannel: IObservable<MyMessage> 
    { 
     List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>(); 

     public IDisposable Subscribe(IObserver<MyMessage> observer) 
     { 
      observers.Add(observer); 
      return observer as IDisposable; 
     } 

     public void GenerateMsgs() 
     { 
      foreach (IObserver<MyMessage> observer in observers) 
      { 
       observer.OnNext(new MyMessage() {subject = "Hello!", process = true}); 
      } 
     } 
    } 

我在Where子句中得到一个ArgumentNullException。这是堆栈;

System.ArgumentNullException was unhandled 
    Message=Value cannot be null. 
Parameter name: disposable 
    Source=System.Reactive 
    ParamName=disposable 
    StackTrace: 
     at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable) 
     at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0() 
     at System.Threading.Scheduler.NowScheduler.Schedule(Action action) 
     at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer) 
     at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18 
     at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args) 
     at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args) 
     at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly() 
     at System.Threading.ThreadHelper.ThreadStart_Context(Object state) 
     at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx) 
     at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) 
     at System.Threading.ThreadHelper.ThreadStart() 
    InnerException: 

回答

1

此行似乎引起大惊小怪:

return observer as IDisposable; 

你不应该承担的观察者是一次性的,你应该回到它知道“退订”的一次性对象。

该方法返回对IDSposable接口的引用。这使得 观察者能够在提供者完成之前取消订阅(即, 停止接收通知) 发送它们并且呼叫 订户的OnCompleted方法。

你可以把它用做类似工作:

public class MessageChannel: IObservable<MyMessage> 
{ 
    class Subscription : IDisposable { 
     MessageChannel _c; 
     IObservable<MyMessage> _obs; 
     public Subscription(MessageChannel c, IObservable<MyMessage> obs) { 
      _c = c; _obs = obs; 
     } 
     public void Dispose() { 
      _c.Unsubscribe(_obs); 
     } 
    } 

    public IDisposable Subscribe(IObserver<MyMessage> observer) 
    { 
     observers.Add(observer); 
     return new Subscription(this, observer); 
    } 

    void Unsubscribe(IObservable<MyMessage> obs) { 
     observers.Remove(obs); 
    } 
} 
+0

非常感谢弗兰克,那打在头上。只需要改变Subcribe类来使用IObserver而不是IObservable(我不抱怨!)。我使用了IObserverable提供的MSDN示例(http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx) – 2010-02-13 19:33:33

+0

您也可以使用'Disposable.Create(()=> Unsubscribe(观察者)) - 'System.Reactive.Disposables'命名空间有许多有用的IDisposable实现 – AlexFoxGill 2013-03-28 09:19:12

1

!!红旗!

我强烈建议您自己不要执行IObserver<T>IObservable<T>。赞成使用Observable.Create<T>或作为最后的手段使用Subject类型。为了正确实现这些接口,需要考虑很多事情,这些接口由正确的Rx类型和运算符为您处理。

在这个例子中,我会劝你放弃的MessageChannel类型和交换它

class Program 
{ 
    static void Main(string[] args) 
    { 
     var channel = GenerateMsgs() 
      .Where(m => m.process) 
      .Subscribe((MyMessage m) => Console.WriteLine(m.subject)); 
    } 

    public IObservable<MyMessage> GenerateMsgs() 
    { 
     return Observable.Create<MyMessage>(observer=> 
     { 
      observer.OnNext(new MyMessage() {subject = "Hello!", process = true}); 
     }); 
    } 
} 

public class MyMessage 
{ 
    public string subject; 
    public bool process; 
} 

在系统设计的进一步检查,你可能有某种服务的一个公开的“通道”作为观察序列。

public interface OrderService 
{ 
    IObservable<OrderRequest> OrderRequests(); 
    IObservable<Order> ProcessedOrders(); 
    IObservable<OrderRejection> OrdersRejections(); 
} 

从而否定为IObserver<T>IObservable<T>这些自定义实现的需要。