2016-11-15 74 views
9

我创建了一个基于James Still博客文章Real-World PubSub Messaging with RabbitMQ的RabbitMQ订阅者的ASP.NET Core MVC/WebApi网站。从静态工厂类访问ASP.NET核心DI容器

在他的文章中,他使用静态类来启动队列订阅者并为排队事件定义事件处理程序。这个静态方法然后通过静态工厂类实例化事件处理程序类。

using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using System; 
using System.Text; 

namespace NST.Web.MessageProcessing 
{ 
    public static class MessageListener 
    { 
     private static IConnection _connection; 
     private static IModel _channel; 

     public static void Start(string hostName, string userName, string password, int port) 
     { 
      var factory = new ConnectionFactory 
      { 
       HostName = hostName, 
       Port = port, 
       UserName = userName, 
       Password = password, 
       VirtualHost = "/", 
       AutomaticRecoveryEnabled = true, 
       NetworkRecoveryInterval = TimeSpan.FromSeconds(15) 
      }; 

      _connection = factory.CreateConnection(); 
      _channel = _connection.CreateModel(); 
      _channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true); 

      var queueName = "myQueue"; 

      QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null); 

      _channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey"); 

      var consumer = new EventingBasicConsumer(_channel); 
      consumer.Received += ConsumerOnReceived; 

      _channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); 

     } 

     public static void Stop() 
     { 
      _channel.Close(200, "Goodbye"); 
      _connection.Close(); 
     } 

     private static void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) 
     { 
      // get the details from the event 
      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
      var messageType = "endpoint"; // hardcoding the message type while we dev... 

      // instantiate the appropriate handler based on the message type 
      IMessageProcessor processor = MessageHandlerFactory.Create(messageType); 
      processor.Process(message); 

      // Ack the event on the queue 
      IBasicConsumer consumer = (IBasicConsumer)sender; 
      consumer.Model.BasicAck(ea.DeliveryTag, false); 
     } 

    } 
} 

它工作良好,直到现在我需要在消息处理器工厂中解析服务而不是写入控制台。

using NST.Web.Services; 
using System; 

namespace NST.Web.MessageProcessing 
{ 
    public static class MessageHandlerFactory 
    { 
     public static IMessageProcessor Create(string messageType) 
     { 
      switch (messageType.ToLower()) 
      { 
       case "ipset": 
        // need to resolve IIpSetService here... 
        IIpSetService ipService = ??????? 

        return new IpSetMessageProcessor(ipService); 

       case "endpoint": 
        // need to resolve IEndpointService here... 
        IEndpointService epService = ??????? 

        // create new message processor 
        return new EndpointMessageProcessor(epService); 

       default: 
        throw new Exception("Unknown message type"); 
      } 
     } 
    } 
} 

有什么办法可以访问ASP.NET Core IoC容器来解决依赖关系吗?或者,有没有更好的方式从ASP.NET Core应用程序订阅RabbitMQ?我发现RestBus,但它不是更新的核心1.x的

+0

你能转换了MessageListener成依赖,无论你有自己的依赖注入需要注入呢? – PmanAce

+0

我很好奇,下面的答案有帮助吗? –

回答

10

可以避开静态类和使用依赖注射一路结合:

  • 使用IApplicationLifetime每当应用程序启动/停止时启动/停止监听器。
  • 使用IServiceProvider来创建消息处理器的实例。

第一件事情,让我们的配置转移到自己的类,它可以从appsettings.json填充:

public class RabbitOptions 
{ 
    public string HostName { get; set; } 
    public string UserName { get; set; } 
    public string Password { get; set; } 
    public int Port { get; set; } 
} 

// In appsettings.json: 
{ 
    "Rabbit": { 
    "hostName": "192.168.99.100", 
    "username": "guest", 
    "password": "guest", 
    "port": 5672 
    } 
} 

接下来,转换MessageHandlerFactory成接收IServiceProvider作为一个非静态类依赖。它将使用服务提供商来解决消息处理器实例:

public class MessageHandlerFactory 
{ 
    private readonly IServiceProvider services; 
    public MessageHandlerFactory(IServiceProvider services) 
    { 
     this.services = services; 
    } 

    public IMessageProcessor Create(string messageType) 
    { 
     switch (messageType.ToLower()) 
     { 
      case "ipset": 
       return services.GetService<IpSetMessageProcessor>();     
      case "endpoint": 
       return services.GetService<EndpointMessageProcessor>(); 
      default: 
       throw new Exception("Unknown message type"); 
     } 
    } 
} 

这种方式(如你在Startup.ConfigureServices配置它们只要)你的消息处理器类可以在构造函数中的任何依赖关系,他们需要得到。例如,我注入ILogger到我的样品处理器之一:

public class IpSetMessageProcessor : IMessageProcessor 
{ 
    private ILogger<IpSetMessageProcessor> logger; 
    public IpSetMessageProcessor(ILogger<IpSetMessageProcessor> logger) 
    { 
     this.logger = logger; 
    } 

    public void Process(string message) 
    { 
     logger.LogInformation("Received message: {0}", message); 
    } 
} 

现在转换MessageListener成取决于IOptions<RabbitOptions>MessageHandlerFactory。它非常类似于原来的一个,我刚刚更换了一个非静态类与期权相关性和处理程序工厂的启动方法的参数现在是一个依赖,而不是一个静态类:

public class MessageListener 
{ 
    private readonly RabbitOptions opts; 
    private readonly MessageHandlerFactory handlerFactory; 
    private IConnection _connection; 
    private IModel _channel; 

    public MessageListener(IOptions<RabbitOptions> opts, MessageHandlerFactory handlerFactory) 
    { 
     this.opts = opts.Value; 
     this.handlerFactory = handlerFactory; 
    } 

    public void Start() 
    { 
     var factory = new ConnectionFactory 
     { 
      HostName = opts.HostName, 
      Port = opts.Port, 
      UserName = opts.UserName, 
      Password = opts.Password, 
      VirtualHost = "/", 
      AutomaticRecoveryEnabled = true, 
      NetworkRecoveryInterval = TimeSpan.FromSeconds(15) 
     }; 

     _connection = factory.CreateConnection(); 
     _channel = _connection.CreateModel(); 
     _channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true); 

     var queueName = "myQueue"; 

     QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null); 

     _channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey"); 

     var consumer = new EventingBasicConsumer(_channel); 
     consumer.Received += ConsumerOnReceived; 

     _channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); 

    } 

    public void Stop() 
    { 
     _channel.Close(200, "Goodbye"); 
     _connection.Close(); 
    } 

    private void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) 
    { 
     // get the details from the event 
     var body = ea.Body; 
     var message = Encoding.UTF8.GetString(body); 
     var messageType = "endpoint"; // hardcoding the message type while we dev... 
     //var messageType = Encoding.UTF8.GetString(ea.BasicProperties.Headers["message-type"] as byte[]); 

     // instantiate the appropriate handler based on the message type 
     IMessageProcessor processor = handlerFactory.Create(messageType); 
     processor.Process(message); 

     // Ack the event on the queue 
     IBasicConsumer consumer = (IBasicConsumer)sender; 
     consumer.Model.BasicAck(ea.DeliveryTag, false); 
    } 
} 

快到了,你将需要更新Startup.ConfigureServices方法,因此它知道你的服务和选项(你可以创建接口如果你想监听和处理厂):

public void ConfigureServices(IServiceCollection services) 
{    
    // ... 

    // Add RabbitMQ services 
    services.Configure<RabbitOptions>(Configuration.GetSection("rabbit")); 
    services.AddTransient<MessageListener>(); 
    services.AddTransient<MessageHandlerFactory>(); 
    services.AddTransient<IpSetMessageProcessor>(); 
    services.AddTransient<EndpointMessageProcessor>(); 
} 

最后,更新Startup.Configure方法采取额外的IApplicationLifetime参数和启动/停止消息监听器在ApplicationStarted/ApplicationStopped事件(尽管我前一阵子一些注意使用IISExpress,如this question的ApplicationStopping事件)问题:

public MessageListener MessageListener { get; private set; } 
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime appLifetime) 
{ 
    appLifetime.ApplicationStarted.Register(() => 
    { 
     MessageListener = app.ApplicationServices.GetService<MessageListener>(); 
     MessageListener.Start(); 
    }); 
    appLifetime.ApplicationStopping.Register(() => 
    { 
     MessageListener.Stop(); 
    }); 

    // ... 
} 
+0

我不知道在这种情况下,如何将临时注册为transient并实现IDisposable的行为。在asp.net核心中,如果解决临时依赖关系 - 它将在请求完成后处理。但这里没有要求。 – Evk

+0

内置DI在终生管理等方面相对简单。在这种情况下,可能需要考虑挂钩第三方容器,例如Autofact,StructureMap,Unity等,并且为每个邮件创建一个范围 –

+0

是的,但是如果您不这样做并使用默认值,我希望它至少赢了不会被容器处理? – Evk

1

这是我对你的情况的意见:

如果可能的话我会派解决服务作为参数

public static IMessageProcessor Create(string messageType, IIpSetService ipService) 
{ 
    // 
} 

否则使用寿命将是重要的。

如果服务是单身,我将只设置配置方法的依赖:

// configure method 
public IApplicationBuilder Configure(IApplicationBuilder app) 
{ 
    var ipService = app.ApplicationServices.GetService<IIpSetService>(); 
    MessageHandlerFactory.IIpSetService = ipService; 
} 

// static class 
public static IIpSetService IpSetService; 

public static IMessageProcessor Create(string messageType) 
{ 
    // use IpSetService 
} 

如果使用寿命的作用范围我会用HttpContextAccessor:

//Startup.cs 
public void ConfigureServices(IServiceCollection services) 
{ 
    services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>(); 
} 

public IApplicationBuilder Configure(IApplicationBuilder app) 
{ 
    var httpContextAccessor= app.ApplicationServices.GetService<IHttpContextAccessor>(); 
    MessageHandlerFactory.HttpContextAccessor = httpContextAccessor; 
} 

// static class 
public static IHttpContextAccessor HttpContextAccessor; 

public static IMessageProcessor Create(string messageType) 
{ 
    var ipSetService = HttpContextAccessor.HttpContext.RequestServices.GetService<IIpSetService>(); 
    // use it 
} 
2

即使使用依赖注入是一个更好的解决办法,但在某些情况下,你必须在扩展方法是使用静态方法(像)。

对于这些情况,您可以将静态属性添加到静态类并在ConfigureServices方法中初始化它。

例如:

public static class EnumExtentions 
{ 
    static public IStringLocalizerFactory StringLocalizerFactory { set; get; } 

    public static string GetDisplayName(this Enum e) 
    { 
     var resourceManager = StringLocalizerFactory.Create(e.GetType()); 
     var key = e.ToString(); 
     var resourceDisplayName = resourceManager.GetString(key); 

     return resourceDisplayName; 
    } 
} 

,并在您ConfigureServices:

EnumExtentions.StringLocalizerFactory = services.BuildServiceProvider().GetService<IStringLocalizerFactory>();