everyone。 我在传统的生产者 - 消费者场景中使用BlockingCollection。要通过一个处理一个集合中的项目,我写这个代码:C#,BlockingCollection:如何等待,直到收集少于N个项目
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
ProcessItem(item);
}
但如何处理一批的N项(等到收集了N项具有以下)? 我的解决办法是用一些临时缓冲区:
var buffer = new List<MyType>(N);
while (...)
{
var item = collection.Take(cancellationTokenSource.Token);
buffer.Add(item);
if (buffer.Count == N)
{
foreach (var item in items)
{
ProcessItem(item);
}
buffer.Clear();
}
}
但在我看来十分难看......有没有更好的方法吗?
[更新]: 这是扩展方法的原型,它使解决方案更具可读性。也许,有人会发现它有用:
public static class BlockingCollectionExtensions
{
public static IEnumerable<T> TakeBuffer<T>(this BlockingCollection<T> collection,
CancellationToken cancellationToken, Int32 bufferSize)
{
var buffer = new List<T>(bufferSize);
while (buffer.Count < bufferSize)
{
try
{
buffer.Add(collection.Take(cancellationToken));
}
catch (OperationCanceledException)
{
// we need to handle the rest of buffer,
// even if the task has been cancelled.
break;
}
}
return buffer;
}
}
与用法:
foreach (var item in collection.TakeBuffer(cancellationTokenSource.Token, 5))
{
// TODO: process items here...
}
当然,这不是一个完整的解决方案:例如,我想补充任何超时支持 - 如果没有足够的项目,但时间已过,我们需要停止等待并处理已添加到缓冲区的项目。
遗憾,但使用常规队列与锁是东西,这是我想避免的。 – Dennis 2011-04-14 07:20:57