2017-10-13 138 views
0

我有2个系统,一个用于发布消息和其他消耗它们。两者都使用Masstransit(使用RabbitMQ),并使用ASP.Net web api 2和OWIN(以及Autofac作为IoC容器)来实现。 一切工作正常,如果我的消费者没有依赖关系,,但是当我注入一个依赖到我的消费者时,消费方法永远不会执行(在初始化期间没有错误抛出)。使用Autofac在Masstransit中注册一个IConsumer <T>

这是相关出版商代码:

//Startup.cs 
public class Startup 
{ 
    public void Configuration(IAppBuilder app) 
    { 
     HttpConfiguration config = new HttpConfiguration(); 

     IContainer container = null; 
     var builder = new ContainerBuilder(); 

     builder.Register(context => 
     { 
      var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
      { 
       IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings => 
       { 
        settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]); 
        settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]); 
       }); 
      }); 

      return busControl; 
     }) 
     .As<IBusControl>() 
     .As<IBus>() 
     .SingleInstance(); 

     // Register Web API controllers 
     builder.RegisterApiControllers(Assembly.GetExecutingAssembly()); 

     // Resolve dependencies 
     container = builder.Build(); 
     config.DependencyResolver = AutofacWebApiDependencyResolver(container); 

     WebApiConfig.Register(config); 
     SwaggerConfig.Register(config); 
     app.UseCors(CorsOptions.AllowAll); 

     // Register the Autofac middleware FIRST. 
     app.UseAutofacMiddleware(container); 
     app.UseWebApi(config); 

     // Starts MassTransit Service bus, and registers stopping of bus on app dispose 
     var bus = container.Resolve<IBusControl>(); 
     var busHandle = bus.StartAsync(); 
     var properties = new AppProperties(app.Properties); 
     if (properties.OnAppDisposing != CancellationToken.None) 
     { 
      properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30))); 
     } 
    } 
} 

// Controller 
public IHttpActionResult Post() 
{ 
    _bus.Publish<IFooMessage>(new 
    { 
     Foo = "Foo" 
    }); 

    return Ok(); 
} 

这是相关消费者代码:

// Startup.cs 
public class Startup 
{ 
    public void Configuration(IAppBuilder app) 
    { 
     HttpConfiguration config = new HttpConfiguration(); 

     IContainer container = null; 
     var builder = new ContainerBuilder(); 

     builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest(); 
     builder.RegisterModule<BusModule>(); 
     builder.RegisterModule<ConsumersModule>(); 

     // Register Web API controllers 
     builder.RegisterApiControllers(Assembly.GetExecutingAssembly()); 

     // Resolve dependencies 
     container = builder.Build(); 
     config.DependencyResolver = new AutofacWebApiDependencyResolver(container); 

     WebApiConfig.Register(config); 
     SwaggerConfig.Register(config); 
     app.UseCors(CorsOptions.AllowAll); 

     // Register the Autofac middleware FIRST. 
     app.UseAutofacMiddleware(container); 
     app.UseWebApi(config); 

     // Starts MassTransit Service bus, and registers stopping of bus on app dispose 
     var bus = container.Resolve<IBusControl>(); 
     var busHandle = bus.StartAsync(); 
     var properties = new AppProperties(app.Properties); 
     if (properties.OnAppDisposing != CancellationToken.None) 
     { 
      properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30))); 
     } 
    } 
} 

// BusModule.cs 
public class BusModule : Module 
{ 
    protected override void Load(ContainerBuilder builder) 
    { 
     builder.Register(context => 
     { 
      var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
      { 
       IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings => 
       { 
        settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]); 
        settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]); 
       }); 
       cfg.ReceiveEndpoint(rabbitMqHost, "IP.AgilePoint.queue", ec => 
       { 
        ec.LoadFrom(context); 
       }); 
      }); 

      return busControl; 
     }) 
     .SingleInstance() 
     .As<IBusControl>() 
     .As<IBus>(); 
    } 
} 

// ConsumerModule.cs 
public class ConsumersModule : Module 
{ 
    protected override void Load(ContainerBuilder builder) 
    { 
     builder.RegisterType<FooConsumer>(); 
    } 
} 

// FooConsumer.cs 
public class FooConsumer : IConsumer<IFooMessage> 
{ 
    private IFooService _service; 

    public FooConsumer(IFooService service) 
    { 
     _service = service; 
    } 

    public Task Consume(ConsumeContext<IFooMessage> context) 
    { 
     IFooMessage @event = context.Message; 

     _service.DoStuff(@event.Foo); 

     return Task.FromResult(context.Message); 
    } 
} 

注意我FooConsumer具有相关性(构造函数)上IFooService。 我已经关注Masstransit documentation,但我无法得到这个工作。我究竟做错了什么?

框架版本:

  • .Net框架4.6.1
  • Autofac:3.5.2
  • Masstransit:3.5.7

更新时间:

代码可以发现in this Github repository

回答

0

最后我发现我在做什么错。出于某种原因,我无法在RabbitMQ Management中看到我的队列。当我已经能看到错误队列,我注意到德以下错误:

RabbitMQ Error

我登记我IFooService这样:

builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest(); 

InstancePerRequest()是造成错误。如果我注册服务builder.RegisterType<FooService>().As<IFooService>()一切工作正常。我认为这是因为我的公交车实例是作为单身人士(登记为SingleIsntace())。使用web api项目托管我的公共汽车/消费者的事实让我感到困惑。

无论如何,感谢@Chris Patterson和@Alexey Zimarev的正确实施方向(使用MT扩展来注册用户)。

+1

每个请求注册只有在Autofac管理请求范围时才适用,并且这是在WebAPI集成插件中完成的。 MassTransit不使用它。默认注册是*不是单身*,它是每个依赖。这是正确的范围。 MassTransit为每条消息实例化一个消费者。 –

3

我建议查看特定于Autofac的文档,它是通过扩展库完全支持的容器。

http://masstransit-project.com/MassTransit/usage/containers/autofac.html

包装: https://www.nuget.org/packages/masstransit.autofac

+0

我觉得这是我在做什么,除非我正在做一个愚蠢的错误,我看不到...... 我FooConsumer构造函数永远不会被调用 –

+0

你问MT从上下文加载,但我不能查看使用Autofac注册用户的任何代码。如果他们没有注册,你如何期待他们解决? –