2016-11-19 59 views
1

Tree Structure卤面/基于内容的路由

正如你看到有一个总部作为根节点和一些分支作为子节点。有数据类型的消息,我要发布基于数据对象的内容的信息,例如:

if (data.value == xxxx) publish(data, Br1, Br2) 
else if (data.value == yyyy) publish(data, Br3, Br4) 
else if (data.value == zzzz) publis(data, Br5, Br6) 

这是某种定制的发布/订阅模式的版本。但是我想根据消息的内容将Data类型的消息发布给一些特殊的订阅者。

Rebus有解决方案吗?

回答

0
static void Main() 
    { 

     using (var activator = new BuiltinHandlerActivator()) 
     { 
      activator.Handle<Packet>(async (bus, packet) => 
      { 
       string subscriber = "subscriberA"; 
       await bus.Advanced.TransportMessage.Forward(subscriber); 
      }); 

      Configure.With(activator) 
       .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn)) 
       .Transport(t => t.UseMsmq("router")) 
       .Start(); 

      for (int i = 0; i < 10; i++) 
      { 
       activator.Bus.SendLocal(
        new Packet() 
        { 
         ID = i, 
         Content = "content" + i.ToString(), 
         Sent = false, 
        }).Wait(); 
      } 
     } 

     Console.ReadLine(); 
    } 
+0

由于您很早就抛弃了'BuiltinHandlerActivator',所以您会得到'NullReferenceException',如果''Console.ReadLine();'存在for'''循环后面,您将会得到另一个错误, “subscriberA”队列不存在 – mookid8000

+0

愚蠢的我。谢谢。 :) –

+0

是:)你可以阅读关于它[在这里关于交易的维基页面](https://github.com/rebus-org/Rebus/wiki/Transactions) – mookid8000

0

有在卤面的几种解决方案:)

对于您的情况,我可以看到解决它的方法有两种:1)使用自定义主题,或2)实现真正的基于内容的路由器。

如果有意义的话,可以使用Rebus的主题API来模拟此发布/订阅方案,以关注路由。如果你可以说你的数据信息属于某个类别,那么你的用户就可以订阅,这是有道理的。

与基于主题的“真实”排队系统相比, RabbitMQ,Rebus中的主题API非常粗糙。它不允许使用通配符(*)或任何类似高级的主题 - 主题只是可以订阅的简单字符串,然后用作发布/订阅渠道以将事件路由到多个订阅者。

您可以在用户的​​最终使用它像这样:

await bus.Advanced.Topics.Subscribe("department_a"); 

,然后在出版商的结尾:

var data = new Data(...); 

await bus.Advanced.Topics.Publish("department_a", data); 

如果不剪,你可以插入一个“真正的“基于内容的路由器,它仅仅是一个端点,它将您的消息转发给相关用户。

它可以在两个级别与Rebus完成,根据您的要求。如果这是不够看的邮件标题,你应该实现它作为一个“传输消息转发”,因为这会跳过反序列化,并提供了一个很好的API简单地转发消息:

Configure.With(...) 
    .Transport(t => t.UseMsmq("router")) 
    .Routing(r => { 
     r.AddTransportMessageForwarder(async transportMessage => { 
      var headers = transportMessage.Headers; 

      var subscribers = Decide(headers); 

      return ForwardAction.ForwardTo(subscribers); 
     }); 
    }) 
    .Start(); 

如果你需要看实际的消息,你应该只执行一个普通的消息处理程序,然后使用总线转发的短信:

public class Router : IHandleMessages<Data> 
{ 
    readonly IBus _bus; 

    public Router(IBus bus) 
    { 
     _bus = bus; 
    } 

    public async Task Handle(Data message) 
    { 
     var subscribers = Decide(message); 

     foreach(var subscriber in subscribers) 
     { 
      await _bus.Advanced.TransportMessage.ForwardTo(subscriber); 
     } 
    } 
} 

定制实现的路由器是最灵活的解决方案,可以实现任何你喜欢的逻辑,但因为你可以看到它稍微有点牵扯。


(*)卤面不允许一般使用通配符,虽然它直接传递的主题,RabbitMQ的,如果你碰巧使用的是作为传输,这意味着你实际上可以充分利用RabbitMQ的(见this issue有关的一些详细信息)

+0

感谢。 关于最后的解决方案(路由器/处理程序),我写了这些代码行,但在处理程序转发某些数据包对象后,我得到异常。 (总线为空) –

0
using (var trScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) 
{ 
    scope.EnlistRebus(); 
    Packet packet = ReadFromDB() 
    activator.Bus.SendLocal(packet).Wait() 
    scope.Complete() 
} 


activator.Handle<Packet>(async (bus, packet) => 
{ 
    string subscriber = "subscriberA"; 
    await bus.Advanced.TransportMessage.Forward(subscriber); 
}); 
0
 using (var activator = new BuiltinHandlerActivator()) 
     { 
      activator.Handle<Packet>(async message => 
      { 
       string connectionString = 
        "Data Source=.;Initial Catalog=Rebus;User ID=sa;Password=123456"; 

       using (SqlConnection connection = new SqlConnection(connectionString)) 
       { 
        string queryString = @"INSERT INTO CLIENTPACKET(ID, CONTENT, SENT) VALUES(@id, @content, @sent)"; 
        connection.Open(); 

        using (SqlCommand command = new SqlCommand(queryString, connection)) 
        { 
         command.Parameters.Add(new SqlParameter("@id", message.ID)); 
         command.Parameters.Add(new SqlParameter("@content", message.Content)); 
         command.Parameters.Add(new SqlParameter("@sent", message.Sent)); 

         await command.ExecuteNonQueryAsync(); 
        } 
       } 
      }); 


      Configure.With(activator) 
       .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn)) 
       .Transport(t => t.UseMsmq(@"subscriberA")) 
       .Routing(r => r.TypeBased().MapAssemblyOf<Packet>("router")) 
       .Options(o => 
       { 
        TransactionOptions tranOp = new TransactionOptions(); 
        tranOp.IsolationLevel = IsolationLevel.ReadCommitted; 
        o.HandleMessagesInsideTransactionScope(tranOp); 

        o.SetNumberOfWorkers(2); 
        o.SetMaxParallelism(2); 
       }) 
       .Start(); 

      activator.Bus.Subscribe<Packet>().Wait(); 

      Console.WriteLine("Press ENTER to quit"); 
      Console.ReadLine(); 
     }