2017-02-16 168 views
0

我问了一个问题here关于为什么使用Thread.Run启动一个进程并没有像我期望的那样执行尽可能多的并发请求。同时处理rabbitmq消息

这个问题背后的原因是我试图创建一个类,它可以将消息从rabbitmq队列中拉出并同时处理它们达到最大并发消息数。

为此,我在EventingBasicConsumer类的Received处理程序中结束了以下操作。

async void Handle(EventArgs e) 
{ 
    await _semaphore.WaitAsync(); 

    var thread = new Thread(() => 
    { 
     Process(e); 
     _semaphore.Release(); 
     _channel.BasicAck(....); 
    }); 
    thread.Start(); 
} 

但是,对上一篇文章的评论并不是要启动一个线程,除非进行CPU绑定工作。

上述处理程序不知道该作品是CPU绑定,网络,磁盘还是其他。 (Process是一种抽象方法)。

即使如此,我认为我必须在这里启动一个线程或任务,否则Process方法会阻塞rabbitmq线程,并且在完成之前不会再调用该事件处理程序。所以我只能一次处理一种方法。

在这里开始一个新的Thread好吗?最初我曾经使用过Task.Run,但是这并没有产生尽可能多的工人。查看其他帖子。

仅供参考。通过在信号量上设置InitialCount来限制并发线程的数量。

回答

0

正如已经在链接问题中所说的,大量的线程并不能保证性能,就好像它们的数量超过了逻辑内核的数量一样,你得到了一个thread starvation的情况,没有真正的工作正在进行。

但是,如果您仍然需要处理并发操作的数量,您可以试试TPL Dataflow库,设置为MaxDegreeOfParallelism,如this tutorial

var workerBlock = new ActionBlock<EventArgs>(
    // Process event 
    e => Process(e), 
    // Specify a maximum degree of parallelism. 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = InitialCount 
    }); 
var bufferBlock = new BufferBlock(); 
// link the blocks for automatically propagading the messages 
bufferBlock.LinkTo(workerBlock); 

// asynchronously send the message 
await bufferBlock.SendAsync(...); 
// synchronously send the message 
bufferBlock.Post(...); 

BufferBlock是一个队列,所以消息的顺序将被保留。此外,您还可以连接带有过滤拉姆达块添加不同的处理程序(具有不同的并行度):

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs); 
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs); 
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs); 

,但在这种情况下,您应该设置在链末端的默认处理程序,所以消息不会消失(你可以使用一个NullTarget块这样):

bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>); 

而且,该块可以是一个观察者,所以他们完全与Reactive Extensions在UI方面的工作。