2017-07-13 47 views
1

我在尝试中止消息处理程序中的事务时很挣扎。我正在使用RabbitMQ如何中止rebus中的交易?

我的目标是具有以下行为:如果收到消息,我尝试将其内容存储在硬盘上。如果失败了,我想重新输入邮件。通过这样做,我给同一服务的另一个实例提供了尝试相同的机会。 我想要的基本上是控制消息何时被编辑或拒绝的可能性。

我已经查看了源代码,特别是RabbitMqTransport.cs,发现当交易提交时发送了一个ACK。如果交易被中止,则发送NACK。我曾经自己围绕RabbitMQ创建了一个包装类,因此知道这是正确的。

但是,似乎OnAborted永远不会被调用。即使我放弃交易,也会调用OnComitted

我使用以下代码来中止事务。 contextIMessageContext实例传入Messagehandler

context.TransactionContext.OnAborted(() => 
{ 
    Console.WriteLine("Abort"); 
}); 

context.TransactionContext.OnCommitted(async() => 
{ 
    Console.WriteLine("Commit"); 
}); 

context.TransactionContext.Abort(); 

我也试过的这个不同的变化,如获得AmbientTransactionContext或使用Rebus.TransactionScope包,没有效果。

回答

0

根据消息处理程序是否引发异常,Rebus被设计为可以处理ACK/NACK。

换句话说,如果您的消息处理程序不会引发异常,则认为消息传递成功,并且消息被确认。

另一方面,如果您的消息处理程序抛出异常,则消息将被NACK(并因此将其状态重置为RabbitMQ中的“就绪”),从而有效地使其他实例可以获取该消息。

但是,由于RabbitMQ客户端驱动程序的设计方式,我相信该消息实际上不会返回到服务器以供其他实例接收 - 我认为(这只是一个猜测),事​​实驱动程序预取消息并将它们存储在内存队列中,直到它们被消耗,导致消息被简单地标记为内部重新传送。

因此,我希望相同的服务实例执行所有的传递尝试,然后 - 正如您已经正确观察到的那样 - Rebus将消息移动到错误队列中,从而安全地存储它以便稍后处理。

我希望是有道理的:)

0

从您的文章中不清楚的是如何中止交易。我们使用与RabbitMq类似的设置,我总是从处理程序内部抛出一个异常。

+0

我通过从传入处理程序并调用终止处理的IMessageContext中取出TransactionContext来终止事务,如我的代码示例所示。我也尝试过抛出异常,但最终导致Rebus在一段时间后将消息移入错误队列。如果由于某种原因,当时没有其他相同服务的实例正在运行,就会发生这种情况。 –

0

@ mookid8000: 好吧,这使得它十分清楚,当消息ACK版或NACK版。

但是我能够反驳你的假设。我在单独的控制台应用程序中绑定到相同的输入队列创建了两个使用者其中一个总是抛出异常。发布者每10秒发送一条消息。我可以看到,其他使用者在故障抛出异常之后处理消息。我明白了,因为消息的ID是由Rebus在控制台中记录的。

这回答我的问题,但没有解决我的问题。就我而言,我真正想要的是可以让消息保留在队列中,直到其中一个服务实例能够处理它为止。原因是消息的顺序很重要。这可能是我的方法中的一个根本性错误,但现在我不想改变这一点。

是否有办法阻止(某些)消息移动到错误队列? Rebus的二级重试机制是否可以实现这一目标?

有关进一步参考下面的源代码:

普通消费者:

class TimeEventHandler : IHandleMessages<TimeEvent> 
{ 
    public async Task Handle(TimeEvent message) 
    { 
     Console.WriteLine(message.Time); 

     //await Program.Activator.Bus.Reply("Ja danke"); 
    } 
} 

class Program 
{ 
    public static BuiltinHandlerActivator Activator; 

    static void Main(string[] args) 
    { 
     using (Activator = new BuiltinHandlerActivator()) 
     { 
      Activator.Register(() => new TimeEventHandler()); 

      var bus = Configure 
       .With(Activator) 
       .Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", 
        $"ConsumerPrototype").Prefetch(1)) 
       .Routing(r => r.TypeBased()) 
       .Start(); 

      bus.Subscribe<TimeEvent>(); 

      Console.WriteLine("Press enter to quit"); 
      Console.ReadLine(); 

      // Without the unsubscribe we have a durable subscriber, see http://www.enterpriseintegrationpatterns.com/patterns/messaging/DurableSubscription.html 
      //bus.Unsubscribe<TimeEvent>(); 
     } 
    } 
} 

故障消费者:

class FaultedTimeEventHandler : IHandleMessages<TimeEvent> 
{ 
    public async Task Handle(TimeEvent message) 
    { 
     throw new Exception("That should not have happened"); 
    } 
} 

class Program 
{ 
    public static BuiltinHandlerActivator Activator; 

    static void Main(string[] args) 
    { 
     using (Activator = new BuiltinHandlerActivator()) 
     { 
      Activator.Register(() => new FaultedTimeEventHandler()); 

      var bus = Configure 
       .With(Activator) 
       .Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", 
        $"ConsumerPrototype").Prefetch(1)) 
       .Routing(r => r.TypeBased()) 
       .Start(); 

      bus.Subscribe<TimeEvent>(); 

      Console.WriteLine("Press enter to quit"); 
      Console.ReadLine(); 
     } 
    } 
} 

出版商:

public static class PubSubTest 
{ 
    public static void Start() 
    { 
     Console.WriteLine("Starting PubSubTest"); 

     using (var activator = new BuiltinHandlerActivator()) 
     { 
      var bus = Configure 
       .With(activator) 
       .Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", "MessagingTest").Prefetch(1)) 
       .Routing(r => r.TypeBased()) 
       .Start(); 

      Observable 
       .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10)) 
       .Subscribe(_ => bus.Publish(new TimeEvent(DateTime.Now)).Wait()); 

      Console.WriteLine("Press enter to quit"); 
      Console.ReadLine(); 
     } 
    } 
} 

更新26.07.17: 我的结果与二级重试:激活它们后,我通过推迟和重新发送它们来处理它们。通过这样做,我至少可以确保消息稍后处理,而不会丢失。

public async Task Handle(IFailed<TimeEvent> failedMessage) 
{ 
    await bus.Defer(TimeSpan.FromSeconds(30), failedMessage.Message); 
} 

这不是一个最佳的解决方案:

  1. 顺序我的消息被改变:消息被延迟后,下一个消息从队列消耗。
  2. Timeoutmanager是内存中的,我无权访问SQL-Server。

我读到可以在RabbitMQ中使用延迟消息。您可以使用dead-letter-exchangesplugin

将maxDeliveryAttempts增加到Int32.MaxValue非常肮脏,但按照我的要求:它保留消息,最重要的是消息在队列中的顺序。

我将此问题标记为已解决,因为“如何在Rebus中中止交易”这个问题已得到解答。