2010-09-03 50 views
0

任何人都可以看到这个生产者/消费者唯一键控缓冲区impl的任何问题?这个想法是,如果你用相同的键添加项目进行处理,只有最后的值将被处理,旧的/现有的值将被丢弃。这是生产者/消费者唯一键控缓冲区的好实现吗?

public sealed class PCKeyedBuffer<K,V> 
{ 
    private readonly object _locker = new object(); 
    private readonly Thread _worker; 
    private readonly IDictionary<K, V> _items = new Dictionary<K, V>(); 
    private readonly Action<V> _action; 
    private volatile bool _shutdown; 

    public PCKeyedBuffer(Action<V> action) 
    { 
     _action = action; 
     (_worker = new Thread(Consume)).Start(); 
    } 

    public void Shutdown(bool waitForWorker) 
    { 
     _shutdown = true; 
     if (waitForWorker) 
       _worker.Join(); 
    } 

    public void Add(K key, V value) 
    { 
     lock (_locker) 
     { 
      _items[key] = value; 
      Monitor.Pulse(_locker); 
     } 
    } 

    private void Consume() 
    { 
     while (true) 
     { 
      IList<V> values; 
      lock (_locker) 
      { 
       while (_items.Count == 0) Monitor.Wait(_locker); 
       values = new List<V>(_items.Values); 
       _items.Clear(); 
      } 
      foreach (V value in values) 
      { 
       _action(value); 
      } 

      if(_shutdown) return; 
     } 
    } 
} 


    static void Main(string[] args) 
    { 
     PCKeyedBuffer<string, double> l = new PCKeyedBuffer<string, double>(delegate(double d) 
                       { 
                        Thread.Sleep(10); 
                        Console.WriteLine(
                         "Processed: " + d.ToString()); 
                       }); 
     for (double i = 0; i < 100; i++) 
     { 
      l.Add(i.ToString(), i); 
     } 
     for (double i = 0; i < 100; i++) 
     { 
      l.Add(i.ToString(), i); 
     } 
     for (double i = 0; i < 100; i++) 
     { 
      l.Add(i.ToString(), i); 
     } 

     Console.WriteLine("Done Enqeueing"); 
     Console.ReadLine(); 
    } 

回答

2

快速后一旦超过我想说的是,在Consume方法

while (_items.Count == 0) Monitor.Wait(_locker); 

下面的代码也许应该等待使用超时,检查_Shutdown标志每个迭代。特别是因为你没有设置你的消费者线程为后台线程。

另外,Consume方法不会显示出很高的可伸缩性,因为它单手尝试处理整个项目队列。当然,这可能取决于物品生产的速度。我可能会让消费者专注于列表中的单个项目,然后使用TPL来运行多个并发使用者,这样您就可以利用多核心,同时让TPL为您平衡工作负载。为了减少消费者处理单个物品所需的锁定,您可以使用ConcurrentDictionary

0

这是创建实际上正确的自定义生产者/消费者的少数尝试之一。所以在这方面做得很好。但是,就像克里斯指出的那样,当Monitor.Wait被阻止时,你的停车标志将被忽略。没有必要重新提出他的建议来解决这个问题。我可以提供的建议是使用BlockingCollection而不是手动执行Wait/Pulse调用。这也将解决关闭问题,因为Take方法是可取消的。如果您不使用.NET 4.0,则可以在Stephen链接的Reactive Extension下载中找到它。如果这不是一个选项,那么斯蒂芬Toub有一个正确的实施here(除非他是不可取消的,但你总是可以做一个Thread.Interrupt安全地解锁它)。你可以做的是将KeyValuePair项目输入到队列中,而不是使用Dictionary