2016-05-31 74 views
1

我一直在玩RabbitMQ和它的.NET连接器一段时间。 经过一段时间的测试后,我使用RabbitMQ作为Web应用程序和API网络中的经纪人的新系统达到了产量。 所有的应用程序都在几分钟内卡住了。 我想我打了一些依赖于操作系统的阈值或者搞砸了一些TCP堆栈(在我的客户端的TCP/IP连接甚至没有再打到网络服务器之后)。RabbitMQ,.NET和多线程

我还没有找到关于如何处理通过数十个进程和数千个线程的激烈流量传播的好材料。

我有一个系统,每秒生成10k +线程,每个人都必须通过连接删除消息,然后终止。 对于所有这些消息我只是一些'捕手'。

已经采取了一些对策。 不用于每个新线程的新连接 - >声明一个连接工厂和使用静态连接(连接上和渠道功能已经说是线程安全的) - >解决

这里的工厂

public static class Factory 
    { 
     private static IConnection sharedConnection_cl; 
     public static IConnection SharedConnection_cl 
     { 
      get 
      { 
       if (sharedConnection_cl == null) 
       { 
        sharedConnection_cl = GetRabbitMqConnection(); 
       } 
       return sharedConnection_cl; 
     } 

    private static IConnection GetRabbitMqConnection() 
      { 
       ConnectionFactory connectionFactory = new    ConnectionFactory(); 
       connectionFactory.HostName = "x.y.z.w"; 
       connectionFactory.UserName = "notTheGuestUser"; 
       connectionFactory.Password = "abcdef"; 
       connectionFactory.VirtualHost = "/dedicatedHost"; 
       return connectionFactory.CreateConnection(); 
      } 

没有代码来使用所有可用的Erlang进程。 Erlang进程阈值在不到10分钟的时间内达到(同一连接上的封闭信道不会触发服务器上相应Erlang进程的死亡).-> 在任何给定连接的最大信道计数上添加阈值并保护使用信号量进行访问。每过一段时间的连接被关闭并重新创建(当引黄关闭相应的Erlang进程终止) - >解决

下面是管理渠道门槛

public static class Factory 
{ 
    private static IConnection sharedConnection_cl; 
    private static int channelCount_int { get; set; } 
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1); 
    public static IConnection SharedConnection_cl 
    { 
     get 
     { 
      if (sharedConnection_cl == null) 
      { 
       sharedConnection_cl = GetRabbitMqConnection(); 
       channelCount_int = 0; 
      } 
      connectionAccessSemaphore_sem.WaitOne(); 
      if (channelCount_int > 10000) 
      { 

       sharedConnection_cl.Close(); 
       sharedConnection_cl = GetRabbitMqConnection(); 
       channelCount_int = 0; 
      } 
      else 
      { 
       channelCount_int++; 
      } 
      connectionAccessSemaphore_sem.Release(); 
      return sharedConnection_cl; 
     } 
    } 

现在的代码.. 。这只会增加操作系统标准阈值(这只会将不可避免的数据块的痛苦从几分钟延长到几个小时)...

是否有任何良好的做法来管理连接和通道,以避免操作系统门槛的出现以及satur的出现趋势?

感谢您的支持。

+0

“我对这些消息只有几个'捕手'。”你的意思是订户吗? –

+0

这可能有助于https://www.rabbitmq.com/memory.html,https://www.rabbitmq.com/persistence-conf.html –

+0

看看这个微服务架构,也许它会帮助你。 https://msdn.microsoft。com/en-us/magazine/mt595752.aspx – Jaider

回答

1

好的,解决方案已经在那里。只是向上移动信号量就是诀窍。我没有考虑到在系统重启时,当所有appPools结束时,我在连接实例分配上遇到并发问题。 - >解决

public static class Factory{ 
private static IConnection sharedConnection_cl; 
private static int channelCount_int { get; set; } 
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1); 
public static IConnection SharedConnection_cl 
{ 
    get 
    { 
     connectionAccessSemaphore_sem.WaitOne(); 

     if (sharedConnection_cl == null) 
     { 
      sharedConnection_cl = GetRabbitMqConnection(); 
      channelCount_int = 0; 
     } 

     if (channelCount_int > 10000) 
     { 

      sharedConnection_cl.Close(); 
      sharedConnection_cl = GetRabbitMqConnection(); 
      channelCount_int = 0; 
     } 
     else 
     { 
      channelCount_int++; 
     } 
     connectionAccessSemaphore_sem.Release(); 
     return sharedConnection_cl; 
    } 
} 

不知道为什么AppPools的锁锁定的服务器上的所有TCP连接。