2013-02-15 92 views
111

我试图用SignalR应用程序创建Redis消息总线故障转移方案。使用SignalR与Redis messagebus故障转移使用BookSleeve的ConnectionUtils.Connect()

首先,我们尝试了一种简单的硬件负载平衡器故障切换,它只监视两台Redis服务器。 SignalR应用程序指向单一的HLB端点。然后,我失败了一台服务器,但无法在第二台Redis服务器上成功获取任何消息,而无需再次使用SignalR应用程序池。大概这是因为它需要将设置命令发布到新的Redis消息总线上。

从SignalR RC1开始,Microsoft.AspNet.SignalR.Redis.RedisMessageBus使用Booksleeve的RedisConnection()连接到单个Redis的pub/sub。

我创建了一个新类RedisMessageBusCluster(),它使用Booksleeve的ConnectionUtils.Connect()连接到Redis服务器集群中的一个。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using BookSleeve; 
using Microsoft.AspNet.SignalR.Infrastructure; 

namespace Microsoft.AspNet.SignalR.Redis 
{ 
    /// <summary> 
    /// WIP: Getting scaleout for Redis working 
    /// </summary> 
    public class RedisMessageBusCluster : ScaleoutMessageBus 
    { 
     private readonly int _db; 
     private readonly string[] _keys; 
     private RedisConnection _connection; 
     private RedisSubscriberConnection _channel; 
     private Task _connectTask; 

     private readonly TaskQueue _publishQueue = new TaskQueue(); 

     public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) 
      : base(resolver) 
     { 
      _db = db; 
      _keys = keys.ToArray(); 

      // uses a list of connections 
      _connection = ConnectionUtils.Connect(serverList); 

      //_connection = new RedisConnection(host: server, port: port, password: password); 

      _connection.Closed += OnConnectionClosed; 
      _connection.Error += OnConnectionError; 


      // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on 
      _connectTask = _connection.Open().Then(() => 
      { 
       // Create a subscription channel in redis 
       _channel = _connection.GetOpenSubscriberChannel(); 

       // Subscribe to the registered connections 
       _channel.Subscribe(_keys, OnMessage); 

       // Dirty hack but it seems like subscribe returns before the actual 
       // subscription is properly setup in some cases 
       while (_channel.SubscriptionCount == 0) 
       { 
        Thread.Sleep(500); 
       } 
      }); 
     } 


     protected override Task Send(Message[] messages) 
     { 
      return _connectTask.Then(msgs => 
      { 
       var taskCompletionSource = new TaskCompletionSource<object>(); 

       // Group messages by source (connection id) 
       var messagesBySource = msgs.GroupBy(m => m.Source); 

       SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); 

       return taskCompletionSource.Task; 
      }, 
      messages); 
     } 

     private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) 
     { 
      if (!enumerator.MoveNext()) 
      { 
       taskCompletionSource.TrySetResult(null); 
      } 
      else 
      { 
       IGrouping<string, Message> group = enumerator.Current; 

       // Get the channel index we're going to use for this message 
       int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; 

       string key = _keys[index]; 

       // Increment the channel number 
       _connection.Strings.Increment(_db, key) 
            .Then((id, k) => 
            { 
             var message = new RedisMessage(id, group.ToArray()); 

             return _connection.Publish(k, message.GetBytes()); 
            }, key) 
            .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) 
            .ContinueWithNotComplete(taskCompletionSource); 
      } 
     } 

     private void OnConnectionClosed(object sender, EventArgs e) 
     { 
      // Should we auto reconnect? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) 
     { 
      // How do we bubble errors? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnMessage(string key, byte[] data) 
     { 
      // The key is the stream id (channel) 
      var message = RedisMessage.Deserialize(data); 

      _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (_channel != null) 
       { 
        _channel.Unsubscribe(_keys); 
        _channel.Close(abort: true); 
       } 

       if (_connection != null) 
       { 
        _connection.Close(abort: true); 
       }     
      } 

      base.Dispose(disposing); 
     } 
    } 
} 

Booksleeve都有自己确定一个主机构,并会自动切换到另一台服务器,和我现在SignalR.Chat测试这个。

web.config,我设置的可用服务器列表:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/> 

然后在Application_Start()

 // Redis cluster server list 
     string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; 

     List<string> eventKeys = new List<string>(); 
     eventKeys.Add("SignalR.Redis.FailoverTest"); 
     GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); 

我添加了两个额外的方法Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) 
{ 
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); 
} 

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) 
{ 
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); 
    resolver.Register(typeof(IMessageBus),() => bus.Value); 

    return resolver; 
} 

现在问题是,当我有几个断点启用时,你在添加用户名后,然后禁用所有断点,应用程序按预期工作。但是,从一开始就禁用断点,在连接过程中似乎会出现一些可能失败的竞争条件。

因此,在RedisMessageCluster()

// Start the connection 
    _connectTask = _connection.Open().Then(() => 
    { 
     // Create a subscription channel in redis 
     _channel = _connection.GetOpenSubscriberChannel(); 

     // Subscribe to the registered connections 
     _channel.Subscribe(_keys, OnMessage); 

     // Dirty hack but it seems like subscribe returns before the actual 
     // subscription is properly setup in some cases 
     while (_channel.SubscriptionCount == 0) 
     { 
      Thread.Sleep(500); 
     } 
    }); 

我尝试添加两者Task.Wait,甚至一个附加Sleep()(未如上所示) - 这在等待/等,但仍然得到错误。

反复出现的错误似乎是在Booksleeve.MessageQueue.cs〜LN 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll 
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 
    --- End of inner exception stack trace --- 
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<--- 



public void Enqueue(RedisMessage item, bool highPri) 
{ 
    lock (stdPriority) 
    { 
     if (closed) 
     { 
      throw new InvalidOperationException("The queue is closed"); 
     } 

如果一个封闭的队列异常被抛出。

我预见到另一个问题:由于Redis连接是在Application_Start()中进行的,因此在“重新连接”到另一台服务器时可能会出现一些问题。但是,我认为这是有效的,当使用单数RedisConnection(),只有一个连接可供选择。然而,随着ConnectionUtils.Connect()的引入,我想从@dfowler或其他SignalR家伙那里了解SignalR如何处理这种情况。

+0

我会看看,但是:首先发生的是,您不必调用'Open',因为您已经打开了连接。尽管如此,我将无法立即看到,因为我准备搭乘飞机 – 2013-02-15 20:16:32

+0

我相信这里有两个问题。 1)Booksleeve如何处理故障转移; 2)SignalR如何使用游标来跟踪客户。当新的消息总线初始化时,mb1中的所有游标都不会在mb2上存在。因此,重置SignalR应用程序池时,它将开始工作 - 而不是之前,这显然不是一个可行的选择。 – ElHaix 2013-02-19 16:04:50

+2

描述SignalR如何使用光标的链接:http://stackoverflow.com/questions/13054592/how-does-signalr-redis-work-under-the-hood/13063449#13063449 – ElHaix 2013-02-19 16:52:25

回答

14

SignalR团队现在实现了对支持自定义连接工厂StackExchange.Redis(BookSleeve的后继者)的支持,后者通过ConnectionMultiplexer支持冗余Redis连接。

最初遇到的问题是,尽管在BookSleeve中创建了我自己的扩展方法来接受服务器集合,但故障切换是不可能的。

现在,随着BookSleeve到StackExchange.Redis的发展,我们现在可以在Connect初始化集合configure服务器/端口的权利。

新的做法是比我下去的时候,在创建UseRedisCluster方法的道路简单得多,而且后端现在pluming支持真正的故障转移:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true"); 

StackExchange.Redis还允许作为文档的Automatic and Manual Configuration部分概述额外的手动配置:

ConfigurationOptions config = new ConfigurationOptions 
{ 
    EndPoints = 
    { 
     { "redis0", 6379 }, 
     { "redis1", 6380 } 
    }, 
    CommandMap = CommandMap.Create(new HashSet<string> 
    { // EXCLUDE a few commands 
     "INFO", "CONFIG", "CLUSTER", 
     "PING", "ECHO", "CLIENT" 
    }, available: false), 
    KeepAlive = 180, 
    DefaultVersion = new Version(2, 8, 8), 
    Password = "changeme" 
}; 

在本质上,与服务器的集合初始化我们SignalR横向扩展环境的能力,现在解决了初始proble米

+0

我应该用500代表奖励回报你的答案吗? ;) – nicael 2014-06-24 13:54:05

+0

那么,如果你认为现在是*答案* :) – ElHaix 2014-06-24 15:19:35

+0

@ElHaix自从你问这个问题以来,你可能最有资格说你的答案是否是确凿的,或者它是否只是谜题中的一部分 - 我建议增加一句话来表明是否可能以及如何解决你的问题 – 2014-06-25 13:14:29