交叉张贴到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9表观BufferBlock.Post/Receive/ReceiveAsync种族/错误
我知道......我真的不使用TplDataflow其最大潜力。 ATM我只是使用BufferBlock
作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。我看到了一些奇怪的行为,让我难以接受如何进行 。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
在上面的代码(其是2000线的分布式解决方案的一部分),Send
被定期调用每100ms左右。这意味着一个项目是Post
编辑到messageQueue
每秒约10次。这已验证。然而,偶尔看起来ReceiveAsync
在超时时间内没有完成(即Post
不会导致ReceiveAsync
完成)并且TimeoutException
在30秒后上升。在这一点上,messageQueue.Count
是在数百。这是意想不到的。在发帖速度较慢的情况下(1帖/秒)也观察到这个问题,并且通常在1000个项目通过BufferBlock
之前发生。
所以,要解决这个问题,我使用下面的代码,它的工作原理,但在接收时(由于上述发生的错误)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ;)
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
这看起来像一个竞争条件有时会导致1秒的延迟在TDF给我,但我不能深究为什么这不会发生在其他地方,我以类似的方式使用BufferBlock
。从ReceiveAsync
到Receive
的实验更改没有帮助。我没有检查,但我想孤立地看,上面的代码完美地工作。这是我见过的“TPL数据流入门”tpldataflow.docx中记录的模式。
我能做些什么来达到这个目的?有没有任何指标可以帮助推断发生了什么?如果我无法创建可靠的测试用例,我可以提供哪些更多信息?
帮助!
我没有看到你正在做什么或你的期望在这里有什么问题。我绝对认为你需要在MSDN论坛上保持活跃状态。你已经获得了@StephenToub的关注,他绝对是你想要的人。 – 2012-04-11 17:13:50
不是。从来没有达到它的底部。我无法在一个小而独立的例子中重现问题。因为我只使用BufferBlock,所以我改为使用自己的异步队列实现。我不必更改任何其他代码......我只是重新实现了我使用的BufferBlock接口的各个部分。现在工作,这让我觉得有什么不妥之处,但我无法证明它。格儿。 – spender 2012-09-18 00:44:49
@spendor非常有趣,奇怪的是,我找到BufferBlock之后取消了自己的异步并发队列实现...现在我必须重新考虑。谢谢。 – 2012-09-19 20:34:11