2015-07-13 77 views
3

我在学习RX,并想使用Console.ReadLine作为可观察序列的源。Console.ReadLine()传递给C#事件

我知道我可以使用“yield return”创建“IEnumerable”,但是对于我的具体用例,我决定创建一个C#事件,以便可能很多观察者可以共享相同的键盘输入。

这里是我的代码:

class Program 
{ 
    private delegate void OnNewInputLineHandler(string line); 

    private static event OnNewInputLineHandler OnNewInputLineEvent = _ => {}; 

    static void Main(string[] args) 
    { 
     Task.Run((Action) GetInput); 

     var input = ConsoleInput(); 
     input.Subscribe(s=>Console.WriteLine("1: " + s)); 

     Thread.Sleep(30000); 
    } 

    private static void GetInput() 
    { 
     while (true) 
      OnNewInputLineEvent(Console.ReadLine()); 
    } 

    private static IObservable<string> ConsoleInput() 
    { 
     return Observable.Create<string>(
     (IObserver<string> observer) => 
     { 
      OnNewInputLineHandler h = observer.OnNext; 
      OnNewInputLineEvent += h; 
      return Disposable.Create(() => { OnNewInputLineEvent -= h; }); 
     }); 
    } 
} 

我的问题 - 当我运行GetInput方法,因为它上面显示,第一个输入线不被发送到序列(但它发送到事件处理程序)。

但是,如果我用下面的版本来替换它,一切都按预期工作:

private static void GetInput() 
{ 
    while (true) 
    { 
     var s = Console.ReadLine(); 
     OnNewInputLineEvent(s); 
    } 
} 

可能有人能够解释为什么这件事会发生一些轻?

回答

4

你试图让自己难过。 Rx几乎总是有一种简单的方法。这只是一个学习的问题,而不是从程序的角度来思考。

这就是你需要:

class Program 
{ 
    static void Main(string[] args) 
    { 
     var subscription = ConsoleInput().Subscribe(s => Console.WriteLine("1: " + s)); 
     Thread.Sleep(30000); 
     subscription.Dispose(); 
    } 

    private static IObservable<string> ConsoleInput() 
    { 
     return 
      Observable 
       .FromAsync(() => Console.In.ReadLineAsync()) 
       .Repeat() 
       .Publish() 
       .RefCount() 
       .SubscribeOn(Scheduler.Default); 
    } 
} 

这可以让多个用户共享通过.Publish().RefCount()的一个输入。 .SubscribeOn(Scheduler.Default)将订阅推送到新的线程 - 没有它你阻止订阅。

+0

感谢您的提示! 但它不回答我的问题 - 为什么一个版本的工作原理: var s = Console.ReadLine(); OnNewInputLineEvent(s); OnNewInputLineEvent(s); 而另一个则丢失第一个通知: OnNewInputLineEvent(Console.ReadLine()); – Natan

+0

@Natan - 我要猜测竞赛状况,但很难证明。生成的IL非常相似 - 只是首先将字符串存储在变量中。它可能只是采取一些命令更长的时间。对我来说似乎有点狡猾。 – Enigmativity

+0

这也是我的第一个想法。我还浏览了IL代码,无法分辨出为什么这会起作用。但是,如果这是一个竞争条件,它不会那么一致!这在Debug和Release版本中是一样的,如果我直接运行或者在调试器中运行。我想我需要构建RX并尝试调试,但我不确定这是否值得付出努力。 – Natan

1

如果您将Task.Run((Action) GetInput);移至订阅后,您的代码将按需要工作。这是因为在您的原始版本中,在将OnNewInputLineEvent连接到observer.OnNext之前,将会运行第一次调用OnNewInputEvent(Console.ReadLine())

+0

正确!两个版本之间的区别在于'var s = ...'版本中,GetInput挂起等待输入并且订阅发生在对事件字段进行任何引用之前。在短版本的event(Console.ReadLine())中,编译器首先复制事件字段的值(此时该值只有一个非Rx用户),然后调用ReadLine()。即使订阅发生在第一个输入行被输入之前,编译器使用的值就是前一个。 – Natan