我想使用异步生产者/消费者队列(AsyncEx lib)通过总线一次发送一条消息。现在我只是通过异步阻塞来实现这一点。它工作正常,但我无法控制队列:(从生产者/消费者队列中移除取消的任务
所以我想出了以下解决方案,问题是取消的任务不会从队列中删除如果我将队列限制为10(因为每个消息需要1秒发送,并且最大队列时间应该是10秒左右),并且队列包含已经有8个等待任务和2个取消任务,比下一个排队的任务会抛出InvalidOperationException异常,尽管两个取消的任务不会被发送。
也许有更好的方法来做到这一点:d
class Program
{
static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();
static void Main()
{
StartAsync().Wait();
}
static async Task StartAsync()
{
var sendingTask = StartSendingAsync();
var tasks = new List<Task>();
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
{
for (var i = 0; i < 10; i++)
{
tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
}
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
}
}
s_Queue.CompleteAdding();
await sendingTask;
s_Queue.Dispose();
Console.WriteLine("Queue completed.");
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
static async Task EnqueueMessageAsync(string message, CancellationToken token)
{
var tcs = new TaskCompletionSource();
using (token.Register(() => tcs.TrySetCanceled()))
{
await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
await tcs.Task;
}
}
static async Task SendMessageAsync(string message)
{
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
}
static async Task StartSendingAsync()
{
while (await s_Queue.OutputAvailableAsync())
{
var t = await s_Queue.DequeueAsync();
if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;
await SendMessageAsync(t.Item1);
t.Item2.TrySetResult();
}
}
}
编辑1 :
正如svik指出的,InvalidOperationException仅在队列已完成时才会被抛出。所以这个解决方案甚至不能解决我最初的等待任务的非托管“队列”问题。如果有例如超过10个电话/ 10秒我有一个完整的队列和额外的非托管“等待任务队列”,就像我的异步阻塞方法(AsyncMonitor)一样。我想我必须要拿出那么一些其他的解决办法...
编辑2:
我有消息的N个不同的生产者(我不知道有多少,因为这不是我的代码),并且只有一个消费者通过总线发送消息并检查它们是否被正确发送(不是真正的字符串消息)。
下面的代码模拟的情况下的代码应该打破(队列大小为10):
- 排队10的消息(具有5秒的超时)
- 等待5秒(消息0-4被送到和消息5-9被取消)
- 排队11新消息(W/O超时)
- 消息10 - 19应该被排队,因为队列中只包含已取消的消息
- 消息20应抛出除离子(例如
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5))) { for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); } await Task.Delay(TimeSpan.FromSeconds(5)); for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); } try { await Task.WhenAll(tasks); Console.WriteLine("All messages sent."); } catch (TaskCanceledException) { Console.WriteLine("At least one task was canceled."); Console.WriteLine("Press any key to complete queue..."); Console.ReadKey(); } }
的目标是什么,我想有过应该发送的所有邮件的完全控制:QueueOverflowException),因为队列已满,这将是由生产者代码
生产者处理或不,但在我以前发布的代码中并不是这种情况,因为我只能控制队列中的消息,而不是等待排队的消息(可能有10000条消息异步等待排队,而我不知道=>生产者代码无法按预期工作,因为发送所有正在等待的消息需要花费很长时间......)
我希望这是更清楚我想要什么来实现的;)
我不明白。如果队列已满(但未完成),则排队另一个项目不应引发InvalidOperationException。 – svick
是的,你是对的,没有例外EnqueueAsync()是“等待”,直到队列不再满... –
这是一个无赖,因为我真的想要一个异常(以及如果太多的电话完成,取消的电话不应该计数:D)。所以这种方法甚至不能解决我的问题。如果有多于例如每10秒钟拨打10个电话。我仍然有一个不受控制的等待任务队列,就像使用我当前的异步阻塞方法一样...... –