2015-05-01 22 views
1

我想使用异步生产者/消费者队列(AsyncEx lib)通过总线一次发送一条消息。现在我只是通过异步阻塞来实现这一点。它工作正常,但我无法控制队列:(从生产者/消费者队列中移除取消的任务

所以我想出了以下解决方案,问题是取消的任务不会从队列中删除如果我将队列限制为10(因为每个消息需要1秒发送,并且最大队列时间应该是10秒左右),并且队列包含已经有8个等待任务和2个取消任务,比下一个排队的任务会抛出InvalidOperationException异常,尽管两个取消的任务不会被发送。

也许有更好的方法来做到这一点:d

class Program 
{ 
    static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue = 
     new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>(); 

    static void Main() 
    { 
     StartAsync().Wait(); 
    } 

    static async Task StartAsync() 
    { 
     var sendingTask = StartSendingAsync(); 
     var tasks = new List<Task>(); 

     using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8))) 
     { 
      for (var i = 0; i < 10; i++) 
      { 
       tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); 
      } 

      try 
      { 
       await Task.WhenAll(tasks); 
       Console.WriteLine("All messages sent."); 
      } 
      catch (TaskCanceledException) 
      { 
       Console.WriteLine("At least one task was canceled."); 
      }     
     } 

     s_Queue.CompleteAdding(); 
     await sendingTask; 
     s_Queue.Dispose(); 
     Console.WriteLine("Queue completed."); 

     Console.WriteLine("Press any key to continue..."); 
     Console.ReadKey(); 
    } 

    static async Task EnqueueMessageAsync(string message, CancellationToken token) 
    { 

     var tcs = new TaskCompletionSource(); 
     using (token.Register(() => tcs.TrySetCanceled())) 
     { 
      await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs)); 
      Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message); 
      await tcs.Task; 
     } 
    } 

    static async Task SendMessageAsync(string message) 
    { 
     await Task.Delay(TimeSpan.FromSeconds(1)); 
     Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message); 
    } 

    static async Task StartSendingAsync() 
    { 
     while (await s_Queue.OutputAvailableAsync()) 
     { 
      var t = await s_Queue.DequeueAsync(); 
      if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue; 

      await SendMessageAsync(t.Item1); 
      t.Item2.TrySetResult(); 
     } 
    } 
} 

编辑1 :

正如svik指出的,InvalidOperationException仅在队列已完成时才会被抛出。所以这个解决方案甚至不能解决我最初的等待任务的非托管“队列”问题。如果有例如超过10个电话/ 10秒我有一个完整的队列和额外的非托管“等待任务队列”,就像我的异步阻塞方法(AsyncMonitor)一样。我想我必须要拿出那么一些其他的解决办法...

编辑2:

我有消息的N个不同的生产者(我不知道有多少,因为这不是我的代码),并且只有一个消费者通过总线发送消息并检查它们是否被正确发送(不是真正的字符串消息)。

下面的代码模拟的情况下的代码应该打破(队列大小为10):

  1. 排队10的消息(具有5秒的超时)
  2. 等待5秒(消息0-4被送到和消息5-9被取消)
  3. 排队11新消息(W/O超时)
  4. 消息10 - 19应该被排队,因为队列中只包含已取消的消息
  5. 消息20应抛出除离子(例如

    using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5))) 
    { 
        for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); } 
        await Task.Delay(TimeSpan.FromSeconds(5)); 
        for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); } 
    
        try 
        { 
         await Task.WhenAll(tasks); 
         Console.WriteLine("All messages sent."); 
        } 
        catch (TaskCanceledException) 
        { 
         Console.WriteLine("At least one task was canceled."); 
         Console.WriteLine("Press any key to complete queue..."); 
         Console.ReadKey(); 
        } 
    } 
    

    的目标是什么,我想有过应该发送的所有邮件的完全控制:QueueOverflowException),因为队列已满,这将是由生产者代码

生产者处理或不,但在我以前发布的代码中并不是这种情况,因为我只能控制队列中的消息,而不是等待排队的消息(可能有10000条消息异步等待排队,而我不知道=>生产者代码无法按预期工作,因为发送所有正在等待的消息需要花费很长时间......)

我希望这是更清楚我想要什么来实现的;)

+1

我不明白。如果队列已满(但未完成),则排队另一个项目不应引发InvalidOperationException。 – svick

+0

是的,你是对的,没有例外EnqueueAsync()是“等待”,直到队列不再满... –

+0

这是一个无赖,因为我真的想要一个异常(以及如果太多的电话完成,取消的电话不应该计数:D)。所以这种方法甚至不能解决我的问题。如果有多于例如每10秒钟拨打10个电话。我仍然有一个不受控制的等待任务队列,就像使用我当前的异步阻塞方法一样...... –

回答

0

我不知道是否回答我的追问是确定的,所以我不会将其标记为答案,也许有人想出了一个更好的解决方案:P

这里首先是生产者代码:

static async Task StartAsync() 
{ 
    using (var queue = new SendMessageQueue(10, new SendMessageService())) 
    using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(4.5))) 
    { 
     var tasks = new List<Task>(); 

     for (var i = 0; i < 10; i++) 
     { 
      tasks.Add(queue.SendAsync(i.ToString(), timeoutTokenSource.Token)); 
     } 
     await Task.Delay(TimeSpan.FromSeconds(4.5)); 
     for (var i = 10; i < 25; i++) 
     { 
      tasks.Add(queue.SendAsync(i.ToString(), default(CancellationToken))); 
     } 

     await queue.CompleteSendingAsync(); 

     for (var i = 0; i < tasks.Count; i++) 
     { 
      try 
      { 
       await tasks[i]; 
       Console.WriteLine("Message '{0}' send.", i); 
      } 
      catch (TaskCanceledException) 
      { 
       Console.WriteLine("Message '{0}' canceled.", i); 
      } 
      catch (QueueOverflowException ex) 
      { 
       Console.WriteLine(ex.Message); 
      } 
     } 
    } 
    Console.WriteLine("Press any key to continue..."); 
    Console.ReadKey(); 
} 
  • 25消息排队超过5秒
  • 16个消息重新发送
  • 3消息无法发送(队列已满)
  • 6的消息被取消

这里是根据列表中的“队列”级。它是队列和消费者的组合。同步使用AsyncMonitor类(Stephen Cleary的AsyncEx)完成。

class SendMessageQueue : IDisposable 
{ 
    private bool m_Disposed; 
    private bool m_CompleteSending; 
    private Task m_SendingTask; 
    private AsyncMonitor m_Monitor; 
    private List<MessageTaskCompletionSource> m_MessageCollection; 
    private ISendMessageService m_SendMessageService; 

    public int Capacity { get; private set; } 


    public SendMessageQueue(int capacity, ISendMessageService service) 
    { 
     Capacity = capacity; 
     m_Monitor = new AsyncMonitor(); 
     m_MessageCollection = new List<MessageTaskCompletionSource>(); 
     m_SendMessageService = service; 
     m_SendingTask = StartSendingAsync(); 
    } 

    public async Task<bool> SendAsync(string message, CancellationToken token) 
    { 
     if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); } 
     if (message == null) { throw new ArgumentNullException("message"); } 

     using (var messageTcs = new MessageTaskCompletionSource(message, token)) 
     { 
      await AddAsync(messageTcs); 
      return await messageTcs.Task; 
     } 
    } 

    public async Task CompleteSendingAsync() 
    { 
     if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); } 

     using (m_Monitor.Enter()) 
     { 
      m_CompleteSending = true; 
     } 
     await m_SendingTask; 
    } 

    private async Task AddAsync(MessageTaskCompletionSource message) 
    { 
     using (await m_Monitor.EnterAsync(message.Token)) 
     { 
      if (m_CompleteSending) { throw new InvalidOperationException("Queue already completed."); } 
      if (Capacity < m_MessageCollection.Count) 
      { 
       m_MessageCollection.RemoveAll(item => item.IsCanceled); 
       if (Capacity < m_MessageCollection.Count) 
       { 
        throw new QueueOverflowException(string.Format("Queue overflow; '{0}' couldn't be enqueued.", message.Message)); 
       } 
      } 
      m_MessageCollection.Add(message); 
     } 
     m_Monitor.Pulse(); // signal new message 
     Console.WriteLine("Thread '{0}' - {1}: '{2}' enqueued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message.Message); 
    } 

    private async Task<MessageTaskCompletionSource> TakeAsync() 
    { 
     using (await m_Monitor.EnterAsync()) 
     { 
      var message = m_MessageCollection.ElementAt(0); 
      m_MessageCollection.RemoveAt(0); 
      return message; 
     } 
    } 

    private async Task<bool> OutputAvailableAsync() 
    { 
     using (await m_Monitor.EnterAsync()) 
     { 
      if (m_MessageCollection.Count > 0) { return true; } 
      else if (m_CompleteSending) { return false; } 

      await m_Monitor.WaitAsync(); 
      return true; 
     } 
    } 

    private async Task StartSendingAsync() 
    { 
     while (await OutputAvailableAsync()) 
     { 
      var message = await TakeAsync(); 
      if (message.IsCanceled) continue; 
      try 
      { 
       var result = await m_SendMessageService.SendMessageAsync(message.Message, message.Token); 
       message.TrySetResult(result); 
      } 
      catch (TaskCanceledException) { message.TrySetCanceled(); } 
     } 
    } 

    public void Dispose() 
    { 
     Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    protected void Dispose(bool disposing) 
    { 
     if (m_Disposed) return; 
     if (disposing) 
     { 
      if (m_MessageCollection != null) 
      { 
       var tmp = m_MessageCollection; 
       m_MessageCollection = null; 
       tmp.ForEach(item => item.Dispose()); 
       tmp.Clear(); 
      } 
     } 
     m_Disposed = true; 
    } 

    #region MessageTaskCompletionSource Class 

    class MessageTaskCompletionSource : TaskCompletionSource<bool>, IDisposable 
    { 
     private bool m_Disposed; 
     private IDisposable m_CancellationTokenRegistration; 

     public string Message { get; private set; } 
     public CancellationToken Token { get; private set; } 
     public bool IsCompleted { get { return Task.IsCompleted; } } 
     public bool IsCanceled { get { return Task.IsCanceled; } } 
     public bool IsFaulted { get { return Task.IsFaulted; } } 


     public MessageTaskCompletionSource(string message, CancellationToken token) 
     { 
      m_CancellationTokenRegistration = token.Register(() => TrySetCanceled()); 
      Message = message; 
      Token = token; 
     } 

     public void Dispose() 
     { 
      Dispose(true); 
      GC.SuppressFinalize(this); 
     } 

     protected void Dispose(bool disposing) 
     { 
      if (m_Disposed) return; 
      if (disposing) 
      { 
       TrySetException(new ObjectDisposedException(GetType().Name)); 

       if (m_CancellationTokenRegistration != null) 
       { 
        var tmp = m_CancellationTokenRegistration; 
        m_CancellationTokenRegistration = null; 
        tmp.Dispose(); 
       } 
      } 
      m_Disposed = true; 
     } 
    } 

    #endregion 
} 

现在我用这个解决方案行,它完成了这项工作:D

+0

请将您的答案标记为已接受。如果你不这样做,它会自动升级到头版每隔几个飞蛾作为一个问题,没有被接受的答案。 –