2011-04-14 133 views
3

everyone。 我在传统的生产者 - 消费者场景中使用BlockingCollection。要通过一个处理一个集合中的项目,我写这个代码:C#,BlockingCollection:如何等待,直到收集少于N个项目

while (...) 
{ 
    var item = collection.Take(cancellationTokenSource.Token); 
    ProcessItem(item); 
} 

但如何处理一批的N项(等到收集了N项具有以下)? 我的解决办法是用一些临时缓冲区:

var buffer = new List<MyType>(N); 

while (...) 
{ 
    var item = collection.Take(cancellationTokenSource.Token); 

    buffer.Add(item); 
    if (buffer.Count == N) 
    { 
    foreach (var item in items) 
    { 
     ProcessItem(item); 
    } 

    buffer.Clear(); 
    } 
} 

但在我看来十分难看......有没有更好的方法吗?

[更新]: 这是扩展方法的原型,它使解决方案更具可读性。也许,有人会发现它有用:

public static class BlockingCollectionExtensions 
{ 
    public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection, 
     CancellationToken cancellationToken, Int32 bufferSize) 
    { 
     var buffer = new List<T>(bufferSize); 

     while (buffer.Count < bufferSize) 
     { 
      try 
      { 
       buffer.Add(collection.Take(cancellationToken)); 
      } 
      catch (OperationCanceledException) 
      { 
       // we need to handle the rest of buffer, 
       // even if the task has been cancelled. 
       break; 
      } 
     } 

     return buffer; 
    } 
} 

与用法:

foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token, 5)) 
{ 
    // TODO: process items here... 
} 

当然,这不是一个完整的解决方案:例如,我想补充任何超时支持 - 如果没有足够的项目,但时间已过,我们需要停止等待并处理已添加到缓冲区的项目。

回答

0

我不觉得那个解决方案很丑。批处理是对阻塞收集所做的正交要求,应该这样处理。我会用一个干净的界面将批处理行为封装在一个BatchProcessor类中,但除此之外,我真的没有看到这种方法的问题。

0

您可能会发现队列的无锁实现以及阻塞集合是过早优化。如果退后一步并使用基于监控器的锁队列,则可能可以编写更清晰的代码。

+0

遗憾,但使用常规队列与锁是东西,这是我想避免的。 – Dennis 2011-04-14 07:20:57

0

首先我不确定你的逻辑是否正确。你说你想等到收集少于N件物品 - 是不是相反?您希望集合具有N个或更多项目,以处理N个项目。或者我是误解。

然后,我还建议你一个一个地处理项目,如果少于N个项目,或者你可能会发现你的应用程序似乎挂在N-1项目。当然,如果这是一个稳定的数据流,只有在buffer.Count> = N的情况下才可以处理。

我建议去队列和监控像GregC说的那样。

事情是这样的:

public object Dequeue() { 
    while (_queue.Count < N) { 
    Monitor.Wait(_queue); 
    } 
return _queue.Dequeue(); 
} 

public void Enqueue(object q) 
{ 
    lock (_queue) 
{ 
    _queue.Enqueue(q); 
    if (_queue.Count == N) 
    { 
     // wake up any blocked dequeue call(s) 
     Monitor.PulseAll(_queue); 
    } 
} 
} 
+0

BlockingCollection.Take等待集合是否为空,并在某个项目可用并从集合中删除时返回。我需要相同的功能,但对于N项。关于队列和锁 - 请参阅评论GregC的回复。 – Dennis 2011-04-14 07:26:24

+0

这个怎么样的变化,那么: 公开名单出列(){ 名单 returnlist =新名单(); (_queue.Count snaits 2011-04-14 11:01:49