2016-05-17 62 views
1

我想在Rx中'拉链'任意数量的流,其中元素对应但可能会乱序处理。每个流的元素都有一个标识符,可用于将它们匹配在一起。例如。内容是这样的:通过加入属性结合许多流的Rx

public class Element 
{ 
    public string Key {get; set;} 
} 

通过其发生的指数通常情况下,压缩只会结合元素:

|-A-----------A 
|--B---------B- 
|-----C------C- 
|-----ABC-----ABC <- zip 

但是,如果我们想只匹配共享相同的关键要素是什么?我正在寻找的作品更多的是这样一个顺序:

(在这个例子中,关键是1或2)

|--2A-------1A---------- 
|----1B----------2B----- 
|------1C-----------2C-- 
|-----------1ABC----2ABC <- zipped by key 1 & 2 respectively 

我觉得适合群组加入这个场景,但它只是有两个观测量并链接他们很快失控。

我也看过And/Then /时,但并没有真正理解如何构造它的这种情况。

理想情况下,我想要一个扩展方法,我可以调用并提供结果选择器,其中结果选择器的输入保证具有相同的键。

你会如何解决这个问题?

回答

0

这是我在LinqPad中碰到的东西。它符合您的大理石图的要求。然而,它比我想要的更混乱。

的NuGet依赖关系上Rx-Testing

void Main() 
{ 
    TestScheduler scheduler = new TestScheduler(); 
    /* 
|--2A-------1A---------- 
|----1B----------2B----- 
|------1C-----------2C-- 
|-----------1ABC----2ABC <- zipped by key 1 & 2 respectively 

    */ 
    var sourceA = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(3, "2A"), 
     ReactiveTest.OnNext(12, "1A")); 
    var sourceB = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(5, "1B"), 
     ReactiveTest.OnNext(17, "2B")); 
    var sourceC= scheduler.CreateColdObservable(
     ReactiveTest.OnNext(7, "1C"), 
     ReactiveTest.OnNext(20, "2C")); 

    var observer = scheduler.CreateObserver<string>(); 


    var query = Observable.Merge(sourceA, sourceB, sourceC) 
     .GroupBy(x => GetKey(x)) 
     .SelectMany(grp => grp.Select(x=>GetValue(x)) 
           .Take(3) 
           .Aggregate(new List<string>(), 
             (accumulator, current) => { 
              accumulator.Add(current); 
              return accumulator; 
             }) 
          .Select(acc=>CreateGroupResult(grp.Key, acc))); 

    query.Subscribe(observer); 
    scheduler.Start(); 

    ReactiveAssert.AreElementsEqual(
     new[]{ 
      ReactiveTest.OnNext(12, "1ABC"), 
      ReactiveTest.OnNext(20, "2ABC") 
     }, 
     observer.Messages 
    ); 

} 

// Define other methods and classes here 
private static string CreateGroupResult(string key, IEnumerable<string> values) 
{ 
    var combinedOrderedValues = string.Join(string.Empty, values.OrderBy(v => v)); 
    return string.Format("{0}{1}", key, combinedOrderedValues); 
} 

private static string GetKey(string message) 
{ 
    return message.Substring(0, 1); 
} 

private static string GetValue(string message) 
{ 
    return message.Substring(1); 
}