2012-04-09 63 views
16

交叉张贴到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9表观BufferBlock.Post/Receive/ReceiveAsync种族/错误

我知道......我真的不使用TplDataflow其最大潜力。 ATM我只是使用BufferBlock作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。我看到了一些奇怪的行为,让我难以接受如何进行 。

private BufferBlock<object> messageQueue = new BufferBlock<object>(); 

public void Send(object message) 
{ 
    var accepted=messageQueue.Post(message); 
    logger.Info("Send message was called qlen = {0} accepted={1}", 
    messageQueue.Count,accepted); 
} 

public async Task<object> GetMessageAsync() 
{ 
    try 
    { 
     var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
     //despite messageQueue.Count>0 next line 
     //occasionally does not execute 
     logger.Info("message received"); 
     //....... 
    } 
    catch(TimeoutException) 
    { 
     //do something 
    } 
} 

在上面的代码(其是2000线的分布式解决方案的一部分),Send被定期调用每100ms左右。这意味着一个项目是Post编辑到messageQueue每秒约10次。这已验证。然而,偶尔看起来ReceiveAsync在超时时间内没有完成(即Post不会导致ReceiveAsync完成)并且TimeoutException在30秒后上升。在这一点上,messageQueue.Count是在数百。这是意想不到的。在发帖速度较慢的情况下(1帖/秒)也观察到这个问题,并且通常在1000个项目通过BufferBlock之前发生。

所以,要解决这个问题,我使用下面的代码,它的工作原理,但在接收时(由于上述发生的错误)

public async Task<object> GetMessageAsync() 
    { 
     try 
     { 
      object m; 
      var attempts = 0; 
      for (; ;) 
      { 
       try 
       { 
        m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); 
       } 
       catch (TimeoutException) 
       { 
        attempts++; 
        if (attempts >= 30) throw; 
        continue; 
       } 
       break; 

      } 

      logger.Info("message received"); 
      //....... 
     } 
     catch(TimeoutException) 
     { 
      //do something 
     } 
    } 

这看起来像一个竞争条件有时会导致1秒的延迟在TDF给我,但我不能深究为什么这不会发生在其他地方,我以类似的方式使用BufferBlock。从ReceiveAsyncReceive的实验更改没有帮助。我没有检查,但我想孤立地看,上面的代码完美地工作。这是我见过的“TPL数据流入门”tpldataflow.docx中记录的模式。

我能做些什么来达到这个目的?有没有任何指标可以帮助推断发生了什么?如果我无法创建可靠的测试用例,我可以提供哪些更多信息?

帮助!

+1

我没有看到你正在做什么或你的期望在这里有什么问题。我绝对认为你需要在MSDN论坛上保持活跃状态​​。你已经获得了@StephenToub的关注,他绝对是你想要的人。 – 2012-04-11 17:13:50

+0

不是。从来没有达到它的底部。我无法在一个小而独立的例子中重现问题。因为我只使用BufferBlock,所以我改为使用自己的异步队列实现。我不必更改任何其他代码......我只是重新实现了我使用的BufferBlock接口的各个部分。现在工作,这让我觉得有什么不妥之处,但我无法证明它。格儿。 – spender 2012-09-18 00:44:49

+0

@spendor非常有趣,奇怪的是,我找到BufferBlock之后取消了自己的异步并发队列实现...现在我必须重新考虑。谢谢。 – 2012-09-19 20:34:11

回答

1

斯蒂芬似乎认为以下是解

变种米=等待messageQueue.ReceiveAsync();代替

变种米=等待messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

你能确认或否定这一点吗?

+0

这并没有奏效。我选择哪种ReceiveAsync超载并不重要,结果是一样的。看到我的评论上面我的决议。 – spender 2012-09-18 00:46:31