2016-04-28 84 views
1

MassTransit的新手,仍然在玩一些教程项目。我将会有一个服务可能会运行20分钟,并且在完成时我需要做一些事情。因为它可能需要很长时间,我不想跟随请求/响应模式并等待响应,并阻止线程。我认为我的另一种选择是创建另一个队列,供消费者在完成作业时发布。我看过这篇文章:MassTransit3 how to make request from consumer,但我不知道如何实现这一点。我的项目,再从this教程,看起来是这样的:MassTransit:在消费者中创建回叫队列

出版商:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
     x.Host(new Uri("rabbitmq://localhost/"), h => {})); 
    var busHandle = bus.Start(); 
    var text = ""' 
    Console.WriteLine("Publisher"); 
    while(text != "quit") 
    { 
     Console.Write("Enter a message: "); 
     text = Console.ReadLine(); 

     var message = new SomethingHappenedMessage() 
     { 
      What = text, 
      When = DateTime.Now 
     } 
     bus.Publish(message); 
    } 
    busHandle.Stop(); 
} 

认购人:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost"), h => {}); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e => 
      e.Consumer<SomethingHappenedConsumer>()); 
    }); 
    Console.WriteLine("Subscriber"); 
    var busHandle = bus.Start(); 
    Console.ReadKey(); 
    busHandle.Stop(); 
} 

消费者:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened> 
{ 
    public Task Consume(ConsumeContext<ISomethingHappened> context) 
    { 
     Console.Write("TXT: " + context.Message.What); 
     Console.Write(" SENT: " + context.Message.When); 
     Console.Write(" PROCESSED: " + DateTime.Now); 
     Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")"); 
     return Task.FromResult(0); 
    } 
} 

我将如何去创造一个消费者中的回调队列?

回答

1

在您的消费者中,只需Bus.Publish(new ResponseMessage());(或任何您称之为您的回复)并让您的发布者为该消息类型注册消费者。您的发布者似乎没有绑定到队列,只需组成队列名称并将其绑定到队列即可。

1

再次感谢@Travis寻求帮助。只是想展示最终的代码,我最终将为任何人在未来。消息看起来很有趣,但它正确回复给发布者。

出版商:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost/"), h => { }); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e => 
      e.Consumer<ResponseConsumer>()); 
    }); 
    var busHandle = bus.Start(); 
    var text = ""; 
    Console.WriteLine("Publisher"); 
    while(text != "quit") 
    { 
     Console.Write("Enter a message: "); 
     text = Console.ReadLine(); 

     var message = new SomethingHappenedMessage() 
     { 
      What = text, 
      When = DateTime.Now 
     }; 
     bus.Publish(message); 
    } 

    busHandle.Stop(); 
} 

响应消费者:

class ResponseConsumer : IConsumer<IResponse> 
{ 
    public Task Consume(ConsumeContext<IResponse> context) 
    { 
     Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message); 
     return Task.FromResult(0); 
    } 
} 

认购人:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost/"), h => { }); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e => 
      e.Consumer<SomethingHappenedConsumer>()); 
    }); 
    Console.WriteLine("Subscriber"); 
    var busHandle = bus.Start(); 
    Console.ReadKey(); 
    busHandle.Stop(); 
} 

用户消费:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened> 
{ 
    private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x => 
     x.Host(new Uri("rabbitmq://localhost/"), h => { })); 

    public Task Consume(ConsumeContext<ISomethingHappened> context) 
    { 
     var now = DateTime.Now; 
     Console.Write("TXT: " + context.Message.What); 
     Console.Write(" SENT: " + context.Message.When); 
     Console.Write(" PROCESSED: " + now); 
     Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")"); 

     var response = new ResponseMessage() 
     { 
      Message = "The request was processed at " + now 
     }; 

     bus.Publish(response); 
     return Task.FromResult(0); 
    } 
}