2016-11-30 237 views
1

我试图将消息从错误队列移回到它起源的队列。 为此,我在错误队列上创建了一个使用者,然后将其发布到必需队列中。 当我尝试这样做时,消费消息的一半被发布,但另一半被发送到Error_Skipped队列。MassTransit RabbitMQ将错误队列上消费的消息的一半移动到Error_Skipped队列

我已经尝试过许多事情没有成功,所以它可能是简单的,我失踪了。

这里是我的代码示例:

public class ClaimsMessage 
{ 
    public string Description { get; set; } 

    public DateTime Date { get; set; } 

    public bool Handled { get; set; } 
} 

public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>> 
{ 
    public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context) 
    { 
     try 
     { 
      await context.Publish<ClaimsMessage>(context.Message.Message); 

     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

public static IBusControl CreateClaimsErrorConsumerBus(string endPoint) 
{ 
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
    { 
     var host = cfg.Host(new Uri("rabbitmq://localhost/"), h => 
     { 
      h.Username("guest"); 
      h.Password("guest"); 
     }); 

     cfg.ReceiveEndpoint(host, endPoint, e => 
     { 
      e.Consumer(() => new ClaimsMessageErrorConsumer()); 
     }); 
    }); 
    return busControl; 
} 
+0

你有没有考虑使用[铲子?](https://www.rabbitmq.com/shovel.html) – stuartd

+0

我看过一铲。但它有点基础。 我想添加规则,这将允许我们只能将某些消息再次移回,稍后再移回其他消息。 –

+0

可以试试[邮件列表](https://groups.google.com/forum/#!forum/masstransit-discuss) – stuartd

回答

1

如果是从一个错误队列回处理队列移动邮件,你不应该调用Publish - 这将重新发送邮件给所有的用户。您已经知道队列名称,因此将消息直接发送回队列。你所看到的是,你已经在错误队列上创建了一个消费者,它为该消息创建了一个交换绑定。

那么,这样做,而不是:

sbc.ReceiveEndpoint("input_error", x => 
{ 
    // this prevents extra message bindings from being created 
    x.BindMessageExchanges = false; 

    x.Consumer<MyMover>(() => new MyMover(inputQueueAddress); 
}); 

public class MyMover : 
    IConsumer<ClaimsMessage> 
{ 
    public async Task Consume(ConsumeContext<ClaimsMessage> context) 
    { 
     try 
     { 
      var endpoint = await context.GetSendEndpoint(_inputQueueAddress); 
      await endpoint.Send<ClaimsMessage>(context.Message); 
     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

对于额外的信用,拷贝过来的原始邮件头,因此消息的保真度被保留。

+0

谢谢你的回复。 不幸的是,它仍然是这样做的,一条消息放在Claim Queue上,另一条放在Claims_error_skipped队列上。 虽然我没有复制标题。你认为这可能导致它? –

+0

嗨克里斯。 我已经创建了一个示例项目,可以在这里下载: https://drive.google.com/file/d/0B0FYiKs0DMyrYTJfZTcxdVlKSDg/view?usp=sharing[link] 它包括我如何复制它的分步说明。 我完全可能错过了一些东西,但我已经阅读了相当广泛的文档。 –

+0

如果您正在从错误队列中读取消息,则应使用原始消息,而不是错误。实际上,您已在错误队列中为故障创建了额外的使用者绑定,除了从队列中移出的原始消息之外,它还将获取已发布的错误。消耗T代替,并清理RMQ中的绑定,并且你全部设置好了。 –