2017-04-19 56 views
0

我正在处理一些POC project并尝试解决以下问题。包含来自消费者的ACK的较小任务的任务

我有一个Publisher它发送一个消息队列:

bus.PublishAsync<IBaseScenario>(new TestScenario()) 
      .ContinueWith(task => 
      { 
       if (task.IsCompleted && !task.IsFaulted) 
        Console.WriteLine("TestScenario queued with success."); 
       else 
        Console.WriteLine(task.Exception.Message); 
      }); 

而且有些Consumers被消耗的消息:

bus.SubscribeAsync<IBaseScenario>("test_1_consumer", 
      message => Task.Factory.StartNew(() => 
      { 
       var testScenario = message as TestScenario; 
       var anotherTestScenario = message as AnotherTestScenario; 

       ResolveScenario(testScenario); 
       ResolveScenario(anotherTestScenario); 

      }).ContinueWith(task => 
      { 
       if (task.IsCompleted && !task.IsFaulted) 
        Console.WriteLine("Task ended up with success."); 
       else 
        Console.WriteLine(task.Exception.Message); 
      })); 

此时一切工作的需要,但这是我想要实现的。

我的Message是某种场景,其中包含步骤,每个场景发送到队列,然后由消费者维护。

  1. 我想,当每一步都是在消费现场完成的(例如,如果其最终成功与否,以获得某种从消费者ACK信息发送给出版商每次。

  2. 我还想得到一个信息哪些消费者得到的消息。

每一封邮件(方案)应该被视为原子操作,所以不应该有可能做不同的消费者,如果一些步骤步骤将结束wi如果成功,那么整个场景应该被视为失败。

使用以下架构解决这两个需求是可能的吗?还是需要使用更多的东西?

+0

准备好使用基于上述要求的项目已发布并可在此处获得:https://github.com/kownet/Station/releases/tag/0.2如果有人有兴趣 –

回答

0

做的是使用这里https://github.com/EasyNetQ/EasyNetQ/wiki/Request-Response

描述EasyNetQ的请求响应模型在响应你可以把处理的消息和情景的最终地位的消费者的身份的最简单的事情。如果一个消息发送了一个场景,并且该场景包含了所有必需的步骤,那么所有步骤都将由单个消费者处理。

也就是说,消息重复始终是一个问题,因为要么发送消息两次,要么在消费者失败后重新发送消息。如果一个场景永远不会被多次处理至关重要,那么您将需要实施消息重复数据删除或使每个场景都是幂等的。在与RabbitMQ合作时,这是一个普遍的事实。

+0

当然,我也发现这会是最好的解决方案,我已经在这里实施它:https://github.com/kownet/Station/commit/b9dc4ac07467cfc6b8e74f495bcee2bbd1b9ee2f –