2010-02-06 54 views
8

我有一个场景,我需要尽快为队列删除一个项目,一旦处理完毕。 我明白从集合而在循环我无法删除的项目,但不知道如果事情 可以用枚举器等来完成...如何修改循环中的队列集合?

这仅仅是一个基本的例子抛出一个错误 “系列后,已修改枚举器被实例化。“

有什么建议吗?非常感谢!!!

代码如下:

 class Program 
     { 
      static void Main() 
      { 

       Queue<Order> queueList = GetQueueList(); 

       foreach (Order orderItem in queueList) 
       { 
        Save(orderItem); 
        Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
        queueList.Dequeue(); 
       } 
       Console.Read(); 

      } 

      private static void Save(Order orderItem) 
      { 
       //we are pretending to save or do something. 
      } 

      private static Queue<Order>GetQueueList() 
      { 
       Queue<Order> orderQueue = new Queue<Order>(); 
       orderQueue.Enqueue(new Order { Id = 1, Name = "Order 1" }); 
       orderQueue.Enqueue(new Order { Id = 1, Name = "Order 2" }); 
       orderQueue.Enqueue(new Order { Id = 2, Name = "Order 3" }); 
       orderQueue.Enqueue(new Order { Id = 3, Name = "Order 4" }); 
       orderQueue.Enqueue(new Order { Id = 4, Name = "Order 5" }); 
       return orderQueue; 
      } 
     } 

     public class Order 
     { 
      public int Id { get; set; } 
      public string Name { get; set; } 
     } 

回答

10

您的foreach更改为:

while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 
    Save(orderItem); 
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
} 

编辑:

要重新处理,如果保存失败做这样的事情:

while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 

    if (!Save(orderItem)) 
    { 
     queueList.Enqueue(orderItem); // Reprocess the failed save, probably want more logic to prevent infinite loop 
    } 
    else 
    { 
     Console.WriteLine("Successfully saved: {0} Name {1} ", orderItem.Id, orderItem.Name); 
    } 
} 

编辑:

约翰ķ提到线程安全这是一个有效的关注,如果你有多个线程访问相同Queue。涉及简单线程安全问题的ThreadSafeQueue类参见http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487


编辑:这是我一直强调大家:-)

这里提到的线程安全问题的范例线程安全的例子。如图所示,默认Queue可以“遗漏”物品,同时仍然减少计数。

更新:更好地表示问题。我绝不会将空项添加到Queue,但标准Queue.Dequeue()会返回多个空值。仅这一点就没问题,但这样做会从内部收集中删除有效的项目,并减少Count。这是一个安全的假设,在这个特定的例子中,从Queue.Dequeue()操作返回的每个null项表示一个从未处理过的有效项目。

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace SO_ThreadSafeQueue 
{ 
    class Program 
    { 
     static int _QueueExceptions; 
     static int _QueueNull; 
     static int _QueueProcessed; 

     static int _ThreadSafeQueueExceptions; 
     static int _ThreadSafeQueueNull; 
     static int _ThreadSafeQueueProcessed; 

     static readonly Queue<Guid?> _Queue = new Queue<Guid?>(); 
     static readonly ThreadSafeQueue<Guid?> _ThreadSafeQueue = new ThreadSafeQueue<Guid?>(); 
     static readonly Random _Random = new Random(); 

     const int Expected = 10000000; 

     static void Main() 
     { 
      Console.Clear(); 
      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Creating queues..."); 

      for (int i = 0; i < Expected; i++) 
      { 
       Guid guid = Guid.NewGuid(); 
       _Queue.Enqueue(guid); 
       _ThreadSafeQueue.Enqueue(guid); 
      } 

      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Processing queues..."); 

      for (int i = 0; i < 100; i++) 
      { 
       ThreadPool.QueueUserWorkItem(ProcessQueue); 
       ThreadPool.QueueUserWorkItem(ProcessThreadSafeQueue); 
      } 

      int progress = 0; 

      while (_Queue.Count > 0 || _ThreadSafeQueue.Count > 0) 
      { 
       Console.SetCursorPosition(0, 0); 

       switch (progress) 
       { 
        case 0: 
         { 
          Console.WriteLine("Processing queues... |"); 
          progress = 1; 
          break; 
         } 
        case 1: 
         { 
          Console.WriteLine("Processing queues... /"); 
          progress = 2; 
          break; 
         } 
        case 2: 
         { 
          Console.WriteLine("Processing queues... -"); 
          progress = 3; 
          break; 
         } 
        case 3: 
         { 
          Console.WriteLine("Processing queues... \\"); 
          progress = 0; 
          break; 
         } 
       } 

       Thread.Sleep(200); 
      } 

      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Finished processing queues..."); 
      Console.WriteLine("\r\nQueue Count:   {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _Queue.Count, _QueueProcessed, _QueueExceptions, _QueueNull); 
      Console.WriteLine("ThreadSafeQueue Count: {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _ThreadSafeQueue.Count, _ThreadSafeQueueProcessed, _ThreadSafeQueueExceptions, _ThreadSafeQueueNull); 

      Console.WriteLine("\r\nPress any key..."); 
      Console.ReadKey(); 
     } 

     static void ProcessQueue(object nothing) 
     { 
      while (_Queue.Count > 0) 
      { 
       Guid? currentItem = null; 

       try 
       { 
        currentItem = _Queue.Dequeue(); 
       } 
       catch (Exception) 
       { 
        Interlocked.Increment(ref _QueueExceptions); 
       } 

       if (currentItem != null) 
       { 
        Interlocked.Increment(ref _QueueProcessed); 
       } 
       else 
       { 
        Interlocked.Increment(ref _QueueNull); 
       } 

       Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times 
      } 
     } 

     static void ProcessThreadSafeQueue(object nothing) 
     { 
      while (_ThreadSafeQueue.Count > 0) 
      { 
       Guid? currentItem = null; 

       try 
       { 
        currentItem = _ThreadSafeQueue.Dequeue(); 
       } 
       catch (Exception) 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueExceptions); 
       } 

       if (currentItem != null) 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueProcessed); 
       } 
       else 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueNull); 
       } 

       Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times 
      } 
     } 

     /// <summary> 
     /// Represents a thread safe <see cref="Queue{T}"/> 
     /// </summary> 
     /// <typeparam name="T"></typeparam> 
     public class ThreadSafeQueue<T> : Queue<T> 
     { 
      #region Private Fields 
      private readonly object _LockObject = new object(); 
      #endregion 

      #region Public Properties 
      /// <summary> 
      /// Gets the number of elements contained in the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      public new int Count 
      { 
       get 
       { 
        int returnValue; 

        lock (_LockObject) 
        { 
         returnValue = base.Count; 
        } 

        return returnValue; 
       } 
      } 
      #endregion 

      #region Public Methods 
      /// <summary> 
      /// Removes all objects from the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      public new void Clear() 
      { 
       lock (_LockObject) 
       { 
        base.Clear(); 
       } 
      } 

      /// <summary> 
      /// Removes and returns the object at the beggining of the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      /// <returns></returns> 
      public new T Dequeue() 
      { 
       T returnValue; 

       lock (_LockObject) 
       { 
        returnValue = base.Dequeue(); 
       } 

       return returnValue; 
      } 

      /// <summary> 
      /// Adds an object to the end of the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      /// <param name="item">The object to add to the <see cref="ThreadSafeQueue{T}"/></param> 
      public new void Enqueue(T item) 
      { 
       lock (_LockObject) 
       { 
        base.Enqueue(item); 
       } 
      } 

      /// <summary> 
      /// Set the capacity to the actual number of elements in the <see cref="ThreadSafeQueue{T}"/>, if that number is less than 90 percent of current capactity. 
      /// </summary> 
      public new void TrimExcess() 
      { 
       lock (_LockObject) 
       { 
        base.TrimExcess(); 
       } 
      } 
      #endregion 
     } 

    } 
} 
+0

嗨That works.What about if we want to dequeue the item only save if was successfull.Can I still do that? 抱歉+谢谢不熟悉队列 – user9969 2010-02-06 19:54:27

+1

@ devnet247:不是真的。如果您没有将顶部的物品出列,那么您无法得到顶部的物品。您需要将失败的项目移动到队列的_tail_,就像本示例所做的那样。 – 2010-02-06 20:02:58

+0

非常感谢您的帮助。现在这样做。 我必须实现完整的线程,并且这个过程发生在wcf服务中。另一个要学习的事情。再次感谢 – user9969 2010-02-06 20:04:14

0

对我来说,它看起来像你正在尝试处理队列中的元素一个接一个。

如何在while循环中打包并处理来自Dequeue的每个元素,直到队列为空?

+0

在生产代码中,我们有一个需要处理的订单队列,在他们每个人之后,我需要将他们出列。 你能告诉我一小段你的意思吗? 谢谢 – user9969 2010-02-06 19:50:46

+0

@ devnet247:看到我​​的答案代码片段。 – 2010-02-06 19:52:21

1

的foreach作为一个合理的方式,通过队列迭代时,你是不是删除项目

当要删除和处理项目中,线程安全的,正确的方法是只删除它们 一个在并在删除后处理它们。

的方法之一是这个

// the non-thread safe way 
// 
while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 
    Save(orderItem); 
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
} 

有可能在队列queueList.Count 和queueList.Dequeue()之间变更的项目数,所以是线程安全的,你必须只使用出队,但当队列为空时,出队将抛出,因此您必须使用异常处理程序。

// the thread safe way. 
// 
while (true) 
{ 
    Order orderItem = NULL; 
    try { orderItem = queueList.Dequeue(); } catch { break; } 
    if (null != OrderItem) 
    { 
     Save(orderItem); 
     Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
    } 
} 
+0

围绕捕获的尝试不会解决线程安全问题,因为在许多线程触及相同的队列时,Dequeue()可能会在实际从内部集合中删除项目时返回空对象。有关更好的选项,请参阅http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487。 – 2010-02-06 20:00:03

+0

@Cory:谢谢。你有这个行为的参考吗? – 2010-02-06 20:01:03

+0

@John:没有经验。我最近编写了一个应用程序,它生成了100个线程来处理来自单个'Queue'文件的文件,并注意到即使我的Queue.Count变为零,其他“已处理”计数器也没有加到最初的Queue.Count '。无论我在同一个“队列”上投掷了多少个线程,我添加到“ThreadSafeQueue”的简单锁定提供了一致的结果。 – 2010-02-06 20:04:28