@ mookid8000: 好吧,这使得它十分清楚,当消息ACK
版或NACK
版。
但是我能够反驳你的假设。我在单独的控制台应用程序中绑定到相同的输入队列创建了两个使用者其中一个总是抛出异常。发布者每10秒发送一条消息。我可以看到,其他使用者在故障抛出异常之后处理消息。我明白了,因为消息的ID是由Rebus在控制台中记录的。
这回答我的问题,但没有解决我的问题。就我而言,我真正想要的是可以让消息保留在队列中,直到其中一个服务实例能够处理它为止。原因是消息的顺序很重要。这可能是我的方法中的一个根本性错误,但现在我不想改变这一点。
是否有办法阻止(某些)消息移动到错误队列? Rebus的二级重试机制是否可以实现这一目标?
有关进一步参考下面的源代码:
普通消费者:
class TimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
Console.WriteLine(message.Time);
//await Program.Activator.Bus.Reply("Ja danke");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new TimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:[email protected]",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
// Without the unsubscribe we have a durable subscriber, see http://www.enterpriseintegrationpatterns.com/patterns/messaging/DurableSubscription.html
//bus.Unsubscribe<TimeEvent>();
}
}
}
故障消费者:
class FaultedTimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
throw new Exception("That should not have happened");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new FaultedTimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:[email protected]",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
出版商:
public static class PubSubTest
{
public static void Start()
{
Console.WriteLine("Starting PubSubTest");
using (var activator = new BuiltinHandlerActivator())
{
var bus = Configure
.With(activator)
.Transport(t => t.UseRabbitMq("amqp://guest:[email protected]", "MessagingTest").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
.Subscribe(_ => bus.Publish(new TimeEvent(DateTime.Now)).Wait());
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
更新26.07.17: 我的结果与二级重试:激活它们后,我通过推迟和重新发送它们来处理它们。通过这样做,我至少可以确保消息稍后处理,而不会丢失。
public async Task Handle(IFailed<TimeEvent> failedMessage)
{
await bus.Defer(TimeSpan.FromSeconds(30), failedMessage.Message);
}
这不是一个最佳的解决方案:
- 顺序我的消息被改变:消息被延迟后,下一个消息从队列消耗。
- Timeoutmanager是内存中的,我无权访问SQL-Server。
我读到可以在RabbitMQ中使用延迟消息。您可以使用dead-letter-exchanges或plugin。
将maxDeliveryAttempts增加到Int32.MaxValue
非常肮脏,但按照我的要求:它保留消息,最重要的是消息在队列中的顺序。
我将此问题标记为已解决,因为“如何在Rebus中中止交易”这个问题已得到解答。
我通过从传入处理程序并调用终止处理的IMessageContext中取出TransactionContext来终止事务,如我的代码示例所示。我也尝试过抛出异常,但最终导致Rebus在一段时间后将消息移入错误队列。如果由于某种原因,当时没有其他相同服务的实例正在运行,就会发生这种情况。 –