2

我需要实现可以从多个线程填充请求的队列。当这个队列变得大于1000个完成的请求时,这个请求应该被存储到数据库中。这是我的实现:如何从ConcurrentQueue消耗chuncks正确

public class RequestQueue 
{ 
    private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>(); 
    private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>(); 

    private static volatile bool isLoading = false; 
    private static object _lock = new object(); 

    public static void Launch() 
    { 
     Task.Factory.StartNew(execute); 
    } 

    public static void Add(VerificationRequest request) 
    { 
     _queue.Add(request); 
    } 

    public static void AddRange(List<VerificationRequest> requests) 
    { 
     Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3}, 
      (request) => { _queue.Add(request); }); 
    } 


    private static void execute() 
    { 
     Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest); 
    } 

    private static void EnqueueSaveRequest(VerificationRequest request) 
    { 
     _storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request)); 
     if (_storageQueue.Count > 1000 && !isLoading) 
     { 
      lock (_lock) 
      { 
       if (_storageQueue.Count > 1000 && !isLoading) 
       { 
        isLoading = true; 

        var requestChunck = new List<VerificationRequest>(); 
        VerificationRequest req; 
        for (var i = 0; i < 1000; i++) 
        { 
         if(_storageQueue.TryDequeue(out req)) 
          requestChunck.Add(req); 
        } 
        new VerificationRequestRepository().InsertRange(requestChunck); 

        isLoading = false; 
       } 
      } 
     }    
    } 
} 

有没有什么办法来实现这个没有锁和isLoading?

+0

为什么你不希望使用锁?我的意思是在这种情况下似乎不会影响性能。 – Evk

+0

我同意,但也许有更好的办法。另外我不确定我是否实现了正确加载isLoading的锁定 – xalz

+0

为什么你甚至需要'isLoading'?如果您只是将其删除,会发生什么变化? – zerkms

回答

3

做你问什么,最简单的方法是在TPL Dataflow库使用的块。例如

var batchBlock = new BatchBlock<VerificationRequest>(1000); 
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{ 
       new VerificationRequestRepository().InsertRange(records); 
}; 

batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true }); 

就是这样。

您可以将消息发送到起始块与

batchBlock.Post(new VerificationRequest(...)); 

一旦你完成你的工作,你可以取下来的整条管线,并通过调用batchBlock.Complete();刷新任何剩余的消息,并等待进行最后的块来完成:

batchBlock.Complete(); 
await exportBlock.Completion; 

BatchBlock批高达1000所记录到的1000个项目阵列,并将它们传递到下一个块。一个ActionBlock默认只使用1个任务,所以它是线程安全的。你可以使用你的资料库的现有实例,而不必担心跨线程访问:

var repository=new VerificationRequestRepository(); 
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{ 
       repository.InsertRange(records); 
}; 

几乎所有的块具有并行输入缓冲区。每个块都运行在自己的TPL任务上,因此每个步骤都可以同时运行。这意味着你异步执行“免费”,如果你有多个链接的步骤也很重要,比如你用TransformBlock修改流经管道的消息。

我使用这种管道创建调用外部服务管道,解析反应,生成最终的记录,批,并结合使用SqlBulkCopy的块发送到数据库。