2016-09-29 72 views
2

我正在尝试在MassTransit v3中使用C#和RabbitMQ实现只发布总线,其中总线没有使用者。这个概念是消息将被发布和排队,然后一个单独的微服务将消耗队列中的消息。看看this SO answer,必须指定接收端点,以便消息实际排队。但是,这似乎是contradict the common gotchas in the MassTransit docs,其中指出If you need to only send or publish messages, don’t create any receive endpoints使用C#和RabbitMQ在MassTransit v3中实现只发布总线

下面是一些示例代码:

public class Program 
    { 
     static void Main(string[] args) 
     { 
      var bus = BusConfigurator.ConfigureBus(); 

      bus.Start(); 

      bus.Publish<IItemToQueue>(new ItemToQueue { Text = "Hello World" }).Wait(); 

      Console.ReadKey(); 

      bus.Stop(); 
     } 
    } 

    public static class BusConfigurator 
    { 
     public static IBusControl ConfigureBus() 
     { 
      var bus = Bus.Factory.CreateUsingRabbitMq(cfg => 
      { 
       var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst => 
       { 
        hst.Username("guest"); 
        hst.Password("guest"); 
       }); 

       cfg.ReceiveEndpoint(host, "queuename", e => 
       { 
        e.Consumer<MyConsumer>(); 
       }); 
      }); 

      return bus; 
     } 
    } 

    public interface IItemToQueue 
    { 
     string Text { get; set; } 
    } 

    public class ItemToQueue : IItemToQueue 
    { 
     public string Text { get; set; } 
    } 

    public class MyConsumer : IConsumer<IItemToQueue> 
    { 
     public async Task Consume(ConsumeContext<IItemToQueue> context) 
     { 
      await Console.Out.WriteLineAsync(context.Message.Text); 
     } 
    } 

在此示例中,我收到了RabbitMQ的队列中的消息符合市场预期,这是由MyConsumer它写的Hello World控制台消耗,消息随后从队列中删除。

然而,当我从上面除去下面的代码并重新运行样品:

cfg.ReceiveEndpoint(host, RabbitMqConstants.ValidationQueue, e => 
{ 
    e.Consumer<MyConsumer>(); 
}); 

临时队列创建(具有生成的名称)和消息似乎从未被放置到临时队列。当总线停止时,该队列将被删除。

我遇到的问题是指定了ReceiveEndpoint,消息将从发布程序中的队列中消耗并被删除(意味着消费者微服务不会处理排队的项目)。没有指定RecieveEndpoint,就会使用一个临时队列(并且消费者微服务不知道这个临时队列的名称),这个消息似乎永远不会排队,并且当总线停止时队列被删除,如果不是该程序失败了。

an example of a send only bus in the MassTransit docs但它是非常基本的,所以我想知道如果有人有任何建议?

+0

虽然您不需要发布者/发件人的接收端点,但确实需要它们在某个地方,否则就没有绑定到消息交换的队列,而且这些消息也不会路由到任何地方 - 从而消失。 –

回答

1

接收端点应位于您的服务中,与发布应用程序分开。这样,该服务将具有接收端点并在应用程序发布它们时使用这些消息。

如果您在应用程序中有接收端点,则应用程序将使用消息,因为它具有在接收端点中指定的相同队列名称。

你需要做的是创建另一个服务,使用相同的配置(包括接收端点) - 并将接收端点从应用程序中取出。此时,该服务将拥有接收端点并消耗队列中的消息。即使服务停止,邮件仍会继续传送到队列中,一旦服务启动,它们将开始消费。

+0

这只是一个让RabbitMQ成立的问题吗?我处于类似的情况,我需要保证接收方队列在应用程序开始发送消息时立即接收消息,而不管该服务在当时是否已启动并正在运行,只要它们正在保存并最终得到处理。如果一旦运行服务设置了满足此要求的事情,那没问题。这是文档可以使用某些示例的一个区域。假设这有效,我会很高兴写出来。 –

+0

在GitHub中有一个问题,实质上是一个不启动,但在RabbitMQ中创建拓扑。自己接线的问题是,类型系统中细微的细微差别可能无法正确手动设置,从而导致启动时出现错误。但是,如果要发送到特定队列,则会有查询字符串参数供发件人将交换机绑定到队列。它是bind或bindQueue = true。 –

+0

谢谢。你能提供问题链接吗?今天早上我终于可以做些事情了。一旦我和一位消费者开始合作,我就可以将其删除,并将消息保留在随后的运行中。我处于开发/探索模式,显然不是一个生产计划:)你提出了一些关于我一直在考虑的类型和版本信息的问题,但是这是另一个帖子的主题,我玩了更多 –