我刚刚接触rx,并且一直在使用dot net中的反应式扩展工作一些网络代码。我的问题是,我用异步函数创建的tcpClients的observable没有完成,因为我期望通过我提供的令牌触发取消。下面是代码的简化版本,我有一个问题:取消使用异步功能创建的observable
public static class ListenerExtensions
{
public static IObservable<TcpClient> ToListenerObservable(
this IPEndPoint endpoint,
int backlog)
{
return new TcpListener(endpoint).ToListenerObservable(backlog);
}
public static IObservable<TcpClient> ToListenerObservable(
this TcpListener listener,
int backlog)
{
return Observable.Create<TcpClient>(async (observer, token) =>
{
listener.Start(backlog);
try
{
while (!token.IsCancellationRequested)
observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
//This never prints and onCompleted is never called.
Console.WriteLine("Completing..");
observer.OnCompleted();
}
catch (System.Exception error)
{
observer.OnError(error);
}
finally
{
//This is never executed and my progam exits without closing the listener.
Console.WriteLine("Stopping listener...");
listener.Stop();
}
});
}
}
class Program
{
static void Main(string[] args)
{
var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
var cancellation = new CancellationTokenSource();
home.ToListenerObservable(10)
.Subscribe(
onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
onError: e => Console.WriteLine($"Error: {e.Message}"),
onCompleted:() => Console.WriteLine("Complete"), // Never happens
token: cancellation.Token);
Console.WriteLine("Any key to cancel");
Console.ReadKey();
cancellation.Cancel();
Thread.Sleep(1000);
}
}
如果我跑这一点,并连接到本地主机:2323我可以看到,我得到连接tcpClients的序列。但是,如果我触发canceltoken的取消,程序将退出而不关闭侦听器并发出onCompleted事件,就像我期望的那样。我究竟做错了什么?
该代码永远不会运行,因为取消操作将触发抛出的OperationCancelledException。 –
https://stackoverflow.com/questions/14524209/what-is-the-correct-way-to-cancel-an-async-operation-that-doesnt-accept-a-cance –
而且,如果你看看这个[来自Stephen Cleary的帖子](http://blog.stephencleary.com/2015/03/a-tour-of-task-part-9-delegate-tasks.html“任务之旅,第9部分:委托任务” ),你会发现将取消标记传递给Task.Run并不完全符合你的想法。 –