2017-08-24 85 views
1

我无法检索对象,一旦我将它添加到队列中,添加它的代码如下所示,这工作正常,据我所知,因为得到的消息在队列。Rebus RabbitMq无法订阅和检索项目

var connection = "amqp://name:[email protected]"; 
using (var activator = new BuiltinHandlerActivator()) 
{ 
    Configure.With(activator) 
     .Logging(l => l.ColoredConsole()) 
     .Transport(t => t.UseRabbitMqAsOneWayClient(connection).ExchangeNames(directExchangeName: "WamosExchange")) // .ExchangeNames(directExchangeName: "WamosExchange") 
     .Routing(r => r.TypeBased() 
      .Map<Wagon>("wagon_v1")) 
     .Start(); 

    var wagon = new Wagon 
    { 
     Token = Guid.Parse("3CCE443C-249F-4FD2-9882-5830FB308B6B"), 
     WagonId = Guid.Parse("A98A06AB-33B9-4A11-9DE2-DF0B8787B713"), 
     Description = "test", 
     WamosId = 12324, 
     YearBuilt = 1982 
    }; 

    await activator.Bus.Send(wagon); 

    Console.WriteLine("Done"); 
} 

检索它的代码如下(这不起作用)。我在使用wagon_v1队列的RabbitMq中使用交换。我看到rebus正在创建另一个以红色突出显示的。所有有点混乱。我希望它使用WamosExchangewagon_v1队列。

enter image description here

static BuiltinHandlerActivator activator = new BuiltinHandlerActivator(); 

static async Task MainSubscribeAsync() 
{ 

    var connection = "amqp://name:[email protected]"; 

    activator.Register(() => new WagonHandler()); 

    var bus = Configure.With(activator) 
     .Logging(l => l.ColoredConsole()) 
     .Transport(t => t.UseRabbitMq(connection, "wagon_v1") 
      .ExchangeNames(directExchangeName: "WamosExchange")) 
     .Routing(r => r.TypeBased() 
      .Map<Wagon>("wagon_v1")) 
     .Options(o => 
     { 
      o.SetMaxParallelism(10); 
      o.SetNumberOfWorkers(10); 
     }) 
     .Start(); 

    await bus.Subscribe<Wagon>(); 

    Console.WriteLine("Done"); 
} 

处理程序如下:

class WagonHandler : IHandleMessages 
{ 
    public void Handle(Wagon message) 
    { 
     Console.WriteLine($"Token {message.Token}"); 
     Console.WriteLine($"WagonId {message.WagonId}"); 
    } 
} 

我只是在尝试订阅

Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 4): Unhandled exception 1 while handling message with ID "f26daec4-48e2-47ca-a3fe-b4f18bc9b217" 
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 2): Unhandled exception 2 while handling message with ID "f26daec4-48e2-47ca-a3fe-b4f18bc9b217" 
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 8): Unhandled exception 3 while handling message with ID "f26daec4-48e2-47ca-a3fe-b4f18bc9b217" 
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 7): Unhandled exception 4 while handling message with ID "f26daec4-48e2-47ca-a3fe-b4f18bc9b217" 
Rebus.Retry.ErrorTracking.InMemErrorTracker WARN (Rebus 1 worker 6): Unhandled exception 5 while handling message with ID "f26daec4-48e2-47ca-a3fe-b4f18bc9b217" 
Rebus.Retry.PoisonQueues.PoisonQueueErrorHandler ERROR (Rebus 1 worker 5): Moving message with ID "f26daec4-48e2-47ca-a3fe-b4f18bc9b217" to error queue "error" 
System.AggregateException: 5 unhandled exceptions ---> Rebus.Exceptions.RebusApplicationException: Message with ID f26daec4-48e2-47ca-a3fe-b4f18bc9b217 and type ThreeSquared.VTGPAM.Objects.Wagon, ThreeSquared.VTGPAM.Objects could not be dispatched to any handlers 
    at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.<Process>d__3.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 

回答

1

随着错误消息状态,消息处理程序时,这个错误找不到处理Wagon

你处理

class WagonHandler : IHandleMessages 
{ 
    public void Handle(Wagon message) 
    { 
     Console.WriteLine($"Token {message.Token}"); 
     Console.WriteLine($"WagonId {message.WagonId}"); 
    } 
} 

不能卤面发现,因为它看起来为IHandleMessages<Wagon>实现,并且你已经只是用的基本接口IHandleMessages

实施IHandleMessages<Wagon>需要你稍微改变方法签名 - 它应该是这样的:

class WagonHandler : IHandleMessages<Wagon> 
{ 
    public async Task Handle(Wagon message) 
    { 
     Console.WriteLine($"Token {message.Token}"); 
     Console.WriteLine($"WagonId {message.WagonId}"); 
    } 
}