2017-04-17 60 views
2

我有3个可观察的事件流 - 按日期顺序 - (主要事件,关于销售的事件和关于客户的事件) - 每个流包含与各种事件有关的事件类型车辆,每个事件都有车辆ID和各种其他属性。事件可以出现在一辆车上,然后是另一辆车等。所以基本上我试图根据VehicleID关联三个独立的事件流 - 这听起来像是应该直截了当的事情。我不熟悉任何形式的复杂可观察节目,所以这证明相当困难。使用CombineLatest关联多个群组的三个事件流

我想随时调用一个函数,在任何流上看到车辆的新事件(我想基本上是combineLatest)。我可以做到这一点,如果我过滤每个流只包含一个车辆的事件,所以Where,但我不知道如何GroupBy,然后得到每个组的最新。我想我想要合并这些流,但是在每组车辆上都会合并最快的。

下面将打印所有我希望创建的对象,仅用于VehcileID = 1。我希望做到以下,但所有的车辆。如果我为每一个VehcileID循环播放这个,这会给我我想要的输出 - 但是这看起来不像臀部可观察的 - 所有东西都是我应该瞄准的禅态。

Observable.CombineLatest(mainEvents.Where(a=>a.VehcileID==1),saleEventsGroup.Where(a=>a.VehcileID==1),customerEventsGroup.Where(a=>a.VehcileID==1),(main, sale, customer)=>{ 
     //Basically flattening various properties from latest state of the 3 streams for current vehicle with some mapping 
     return ComplexObject(){};})  
     .Subscribe(Console.WriteLine); 

我怎样才能得到结合每个车辆的每个流的最新事件。

任何意见,将不胜感激

回答

2

这个怎么样?我只在这里做两个流,但这个想法可以很容易地扩展到三个

[TestMethod] 
    public void GroupByWithMultipleStreams() 
    { 
     Subject<Notification> producer = new Subject<Notification>(); 
     Subject<RelatedToNotification> otherThingProducer = new Subject<RelatedToNotification>();    

     Observable.Merge(
      producer.Select(n => new { Id = n.Id, notification = n, relatedNotification = (RelatedToNotification)null }), 
      otherThingProducer.Select(rn => new { Id = rn.NotificationId, notification = (Notification)null, relatedNotification = rn })) 
      .GroupBy(x => x.Id) 
      .SelectMany(obs => 
      { 
       return obs.Scan(new ComplexObject() { Id = obs.Key }, (acc, input) => 
       { 
        acc.Notification = input.notification ?? acc.Notification; 
        acc.Related = input.relatedNotification ?? acc.Related; 
        return acc; 
       }); 
      }) 
      .Where(result => result.Notification != null && result.Related != null) // if you only want it to fire when everything has a value 
      .Subscribe(result => 
      { 
       //do something with the results here 
      } 
      ); 

     producer.OnNext(new Notification() { Id = 1, Version = 1 }); 
     producer.OnNext(new Notification() { Id = 1, Version = 2 }); 
     producer.OnNext(new Notification() { Id = 2, Version = 17 }); 
     producer.OnNext(new Notification() { Id = 1, Version = 3 }); 
     producer.OnNext(new Notification() { Id = 9, Version = 0 }); 
     producer.OnNext(new Notification() { Id = 9, Version = 1 }); 
     otherThingProducer.OnNext(new RelatedToNotification() { NotificationId = 2, SomeData = "2data" }); 
     otherThingProducer.OnNext(new RelatedToNotification() { NotificationId = 2, SomeData = "2data1" }); 
     otherThingProducer.OnNext(new RelatedToNotification() { NotificationId = 9, SomeData = "9Data" }); 
     producer.OnNext(new Notification() { Id = 2, Version = 1 }); 

    } 

    class ComplexObject 
    { 
     public int Id { get; set; } 
     public Notification Notification { get; set; } 
     public RelatedToNotification Related { get; set; } 
    } 

    class Notification 
    { 
     public int Id { get; set; } 
     public int Version { get; set; } 

     public string Name { get; set; } 
    } 

    public class RelatedToNotification 
    { 
     public int NotificationId { get; set; } 
     public string SomeData { get; set; } 
    } 
+1

一切都有一个共同的基类,所以这变得更加简单。这是合并,selectmany和积累我不知道。非常感谢! – GraemeMiller