2017-06-18 62 views
1

我刚刚接触rx,并且一直在使用dot net中的反应式扩展工作一些网络代码。我的问题是,我用异步函数创建的tcpClients的observable没有完成,因为我期望通过我提供的令牌触发取消。下面是代码的简化版本,我有一个问题:取消使用异步功能创建的observable

public static class ListenerExtensions 
{ 
    public static IObservable<TcpClient> ToListenerObservable(
     this IPEndPoint endpoint, 
     int backlog) 
    { 
     return new TcpListener(endpoint).ToListenerObservable(backlog); 
    } 

    public static IObservable<TcpClient> ToListenerObservable(
     this TcpListener listener, 
     int backlog) 
    { 
     return Observable.Create<TcpClient>(async (observer, token) => 
     { 
      listener.Start(backlog); 

      try 
      { 
       while (!token.IsCancellationRequested) 
        observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token)); 
       //This never prints and onCompleted is never called. 
       Console.WriteLine("Completing.."); 
       observer.OnCompleted(); 
      } 
      catch (System.Exception error) 
      { 
       observer.OnError(error); 
      } 
      finally 
      { 
       //This is never executed and my progam exits without closing the listener. 
       Console.WriteLine("Stopping listener..."); 
       listener.Stop(); 
      } 
     }); 
    } 
} 
class Program 
{ 
    static void Main(string[] args) 
    { 
     var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323); 
     var cancellation = new CancellationTokenSource(); 

     home.ToListenerObservable(10) 
      .Subscribe(
       onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"), 
       onError: e => Console.WriteLine($"Error: {e.Message}"), 
       onCompleted:() => Console.WriteLine("Complete"), // Never happens 
       token: cancellation.Token); 

     Console.WriteLine("Any key to cancel"); 
     Console.ReadKey(); 
     cancellation.Cancel(); 
     Thread.Sleep(1000); 
    } 
} 

如果我跑这一点,并连接到本地主机:2323我可以看到,我得到连接tcpClients的序列。但是,如果我触发canceltoken的取消,程序将退出而不关闭侦听器并发出onCompleted事件,就像我期望的那样。我究竟做错了什么?

+0

该代码永远不会运行,因为取消操作将触发抛出的OperationCancelledException。 –

+0

https://stackoverflow.com/questions/14524209/what-is-the-correct-way-to-cancel-an-async-operation-that-doesnt-accept-a-cance –

+0

而且,如果你看看这个[来自Stephen Cleary的帖子](http://blog.stephencleary.com/2015/03/a-tour-of-task-part-9-delegate-tasks.html“任务之旅,第9部分:委托任务” ),你会发现将取消标记传递给Task.Run并不完全符合你的想法。 –

回答

1

原来我对这里的一些事情感到困惑。 This article帮助我走上正轨。事实上,我最终从那里复制了代码。

public static class TaskCancellations 
{ 
    public static async Task<T> WithCancellation<T>(
     this Task<T> task, 
     CancellationToken token) 
    { 
     var cancellation = new TaskCompletionSource<bool>(); 
     using (token.Register(s => 
      (s as TaskCompletionSource<bool>).TrySetResult(true), cancellation)) 
     { 
      if (task != await Task.WhenAny(task, cancellation.Task)) 
       throw new OperationCanceledException(token); 
      return await task; 
     } 
    } 
} 

而且,像这样的的TcpListener使用它:

public static IObservable<TcpClient> ToListenerObservable(
    this TcpListener listener, 
    int backlog) 
{ 
    return Observable.Create<TcpClient>(async (observer, token) => 
    { 
     listener.Start(backlog) 
     try 
     { 
      while (!token.IsCancellationRequested) 
      { 
       observer.OnNext(await listener.AcceptTcpClientAsync() 
        .WithCancellation(token)); 
      } 
     } 
     catch (OperationCanceledException) 
     { 
      Console.WriteLine("Completing..."); 
      observer.OnCompleted(); 
     } 
     catch (System.Exception error) 
     { 
      observer.OnError(error); 
     } 
     finally 
     { 
      Console.WriteLine("Stopping listener..."); 
      listener.Stop(); 
     } 
    }); 
} 

一切都将按目前预计。

+1

我对'TcpListener' API不熟悉,但我认为你应该使用类似'using(token.Register(()=> listener.Stop()))''。 –

1

尽量避免编写太多的代码并使用取消令牌来避免写入过多的代码。在不脱离标准Rx操作员的情况下,这是一种做你正在做的事情的方式。请不要我不能完全测试这个代码,所以它可能仍然需要一点调整。

试试这个:

var query = Observable.Create<TcpClient>(o => 
{ 
    var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323); 
    var listener = new TcpListener(home); 
    listener.Start(); 
    return 
     Observable 
      .Defer(() => Observable.FromAsync(() => listener.AcceptTcpClientAsync())) 
      .Repeat() 
      .Subscribe(o); 
}); 

var completer = new Subject<Unit>(); 
var subscription = 
    query 
     .TakeUntil(completer) 
     .Subscribe(
      onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"), 
      onError: e => Console.WriteLine($"Error: {e.Message}"), 
      onCompleted:() => Console.WriteLine("Complete")); 

Console.WriteLine("Enter to cancel"); 
Console.ReadLine(); 
completer.OnNext(Unit.Default); 
Thread.Sleep(1000); 

这里的关键是,询问像取消标记的completer。它自然地完成订阅,不像subscription.Dispose()完成它没有OnCompleted电话。

+0

这真的很酷。我做了这样的一次性: 'var stop = Disposable.Create(()=> listener.Stop());' 然后返回一个CompositeDisposeable。现在我不确定在退出程序之前调用stop()有多重要。首先,我想我必须能够立即启动一个监听器到同一个套接字。事实证明,这并不能保证我的理解正确。我的程序在等待状态下终止后,操作系统可能会让它们保持打开状态长达四分钟。我认为这给了我一个“使用中的地址”错误,如果我在结束它之后再次运行侦听器。 –