2016-05-16 31 views
2

这个问题是基于类似名称的问题here,有两点不同:我可以通过匹配键配对两个可观测序列 - 使用重复键吗?

  • 我匹配的多个密钥。没问题。
  • 键可能重复。问题。

我的测试代码如下。我需要以下行为:

  • 一个CoordBundle是只要至少一个CoordMetrics和一个CoordData观察公布。
  • 如果某个特定的X/Y键在任何一个可观察对象上重复出现,则会发布一个新的CoordBundle。

我该做什么才能做到这一点?

public class CoordMetrics 
{ 
    internal CoordMetrics(int x, int y, IEnumerable<IMetric> metrics) 
    { 
     X = x; 
     Y = y; 
     Metrics = metrics; 
    } 
    internal int X { get; private set; } 
    internal int Y { get; private set; } 
    internal IEnumerable<IMetric> Metrics { get; private set; } 
} 

public class CoordData 
{ 
    internal CoordData(int x, int y, IEnumerable<IDatum> data) 
    { 
     X = x; 
     Y = y; 
     Data = data; 
    } 

    internal int X { get; private set; } 
    internal int Y { get; private set; } 
    internal IEnumerable<IDatum> Data { get; private set; } 
} 

public class CoordBundle 
{ 
    internal CoordBundle(int x, int y, IEnumerable<IMetric> metrics, IEnumerable<IDatum> data) 
    { 
     X = x; 
     Y = y; 
     Metrics = metrics; 
     Data = data; 
    } 

    internal int X { get; private set; } 
    internal int Y { get; private set; } 
    internal IEnumerable<IMetric> Metrics { get; private set; } 
    internal IEnumerable<IDatum> Data { get; private set; } 
} 

[TestClass] 
public class PairingTest 
{ 
    [TestMethod, TestCategory("Temp")] 
    public void PairedObservableTest() 
    { 
     Trace.Listeners.Add(new TextWriterTraceListener(Console.Out)); 
     var aSource = new Subject<CoordMetrics>(); 
     var bSource = new Subject<CoordData>(); 

     var paired = Observable.Merge(aSource.Select(a => new Pair(a, null)), bSource.Select(b => new Pair(null, b))) 
           .GroupBy(p => p.Item1 != null ? new { p.Item1.X, p.Item1.Y } : new { p.Item2.X, p.Item2.Y }) 
           .SelectMany(g => g.Buffer(2).Take(1)) 
           .Select(g => new Pair(
            g.ElementAt(0).Item1 ?? g.ElementAt(1).Item1, 
            g.ElementAt(0).Item2 ?? g.ElementAt(1).Item2)) 
           .Select(t => new CoordBundle(t.Item1.X, t.Item1.Y, t.Item1.Metrics, t.Item2.Data)); 

     paired.Subscribe(g => Trace.WriteLine(String.Format("{0},{1}", g.X, g.Y))); 

     bSource.OnNext(new CoordData(2, 1, Enumerable.Empty<IDatum>())); 
     aSource.OnNext(new CoordMetrics(2, 2, Enumerable.Empty<IMetric>())); 
     aSource.OnNext(new CoordMetrics(1, 1, Enumerable.Empty<IMetric>())); 
     bSource.OnNext(new CoordData(1, 2, Enumerable.Empty<IDatum>())); 
     bSource.OnNext(new CoordData(2, 2, Enumerable.Empty<IDatum>())); 
     bSource.OnNext(new CoordData(1, 1, Enumerable.Empty<IDatum>())); 
     aSource.OnNext(new CoordMetrics(1, 2, Enumerable.Empty<IMetric>())); 
     aSource.OnNext(new CoordMetrics(2, 1, Enumerable.Empty<IMetric>())); 
     aSource.OnNext(new CoordMetrics(2, 2, Enumerable.Empty<IMetric>())); 
     bSource.OnNext(new CoordData(2,2,Enumerable.Empty<IDatum>())); 
    } 
} 

所需的输出 - 只以上的输出码的前4行:

2,2 
1,1 
1,2 
2,1 
2,2 
2,2 
+0

如果您创建了一个*最小*完整的可验证示例http://stackoverflow.com/help/mcve,将会非常有用。 “Pair”,“IMetric”和“IData”类型未定义。它似乎也不是你的例子实际需要的。最后,在你的测试中似乎没有断言,如果不是你的评论,你会将你的期望转移到一组断言上,这将是非常好的。 –

+0

伟大的反馈,我会尽量让我的问题下一次更清洁一点。 – ket

回答

1

我想我有你想要的。 公平地说,这不是一个简单的问题。 一个序列是另一个序列的种子是相当普遍的,但在这里你的复杂性是任一个序列都可能是另一个序列的种子。

我得到一个工作解决方案的第一件事是将其分解成可验证的单元测试。 我建议使用TestScheduler及其关联的类型来做到这一点(而不是科目等)。

我从我认为您的要求创建了大理石图。 然后我可以将它映射到两个测试输入序列和预期的输出序列。

最后一部分是实际创建查询。

我以*结尾的方法是创建两个序列,它们将尝试从主序列和子序列进行匹配 - >SelectMany + Where。 但是,因为这两个输入都可以扮演主序列的角色,所以我需要这样做两次。 因为我会订阅两次,所以我需要分享序列 - >Publish()。 也作为每个序列可以产生多个值,我需要取消匹配从前面的匹配时重复到达 - >TakeUntil。最后,我将两个结果集合在一起 - >Merge

*我认为GroupJoin & CombineLatest但他们似乎并没有为我工作。

[TestClass] 
public class PairingTest 
{ 
    [TestMethod, TestCategory("Temp")] 
    public void PairedObservableTest() 
    { 
     var scheduer = new TestScheduler(); 

     /* 
     Legend 
      a = aSource (CoordMetrics) 
      b = bSource (CoordData) 
      r = expected result 


     a ----2--1-----------1--2--2----- 
       2 1   2 1 2 

     b -2--------1--2--1-----------2-- 
      1  2 2 1   2 

     r -------------2--1--1--2--2--2-- 
         2 1 2 1 2 2 
     */ 
     var aSource = scheduer.CreateColdObservable<CoordMetrics>(
      ReactiveTest.OnNext(5, new CoordMetrics(2, 2)), 
      ReactiveTest.OnNext(8, new CoordMetrics(1, 1)), 
      ReactiveTest.OnNext(20, new CoordMetrics(1, 2)), 
      ReactiveTest.OnNext(23, new CoordMetrics(2, 1)), 
      ReactiveTest.OnNext(26, new CoordMetrics(2, 2)) 
     ); 
     var bSource = scheduer.CreateColdObservable<CoordData>(
      ReactiveTest.OnNext(2, new CoordData(2, 1)), 
      ReactiveTest.OnNext(11, new CoordData(1, 2)), 
      ReactiveTest.OnNext(14, new CoordData(2, 2)), 
      ReactiveTest.OnNext(17, new CoordData(1, 1)), 
      ReactiveTest.OnNext(29, new CoordData(2, 2)) 
     ); 

     var testObserver = scheduer.CreateObserver<string>(); 
     Implementation(aSource, bSource) 
      .Subscribe(testObserver); 



     scheduer.Start(); 

     ReactiveAssert.AreElementsEqual(
      new[] { 
        ReactiveTest.OnNext(14, "2,2"), 
        ReactiveTest.OnNext(17, "1,1"), 
        ReactiveTest.OnNext(20, "1,2"), 
        ReactiveTest.OnNext(23, "2,1"), 
        ReactiveTest.OnNext(26, "2,2"), 
        ReactiveTest.OnNext(29, "2,2") 
       }, 
      testObserver.Messages 
     ); 
    } 

    private static IObservable<string> Implementation(IObservable<CoordMetrics> aSource, IObservable<CoordData> bSource) 
    { 
     return Observable.Create<string>(observer => 
     { 
      var aShared = aSource.Publish(); 
      var bShared = bSource.Publish(); 

      var fromA = aShared.SelectMany(a => bShared 
        //Find matching values from B's 
        .Where(b => a.X == b.X && a.Y == b.Y) 
        //Only run until another matching A is produced 
        .TakeUntil(aShared.Where(a2 => a2.X == a.X && a2.Y == a.Y)) 
        //Project/Map to required type. 
        .Select(b => new CoordBundle(a.X, a.Y /*, a.Metrics, b.Data*/)) 
       ); 

      var fromB = bShared.SelectMany(b => aShared 
        //Find matching values from A's 
        .Where(a => a.X == b.X && a.Y == b.Y) 
        //Only run until another matching B is produced 
        .TakeUntil(bShared.Where(b2 => b2.X == b.X && b2.Y == b.Y)) 
        //Project/Map to required type. 
        .Select(a => new CoordBundle(a.X, a.Y /*, a.Metrics, b.Data*/)) 
       ); 

      var paired = Observable.Merge(fromA, fromB); 

      paired 
       .Select(g => String.Format("{0},{1}", g.X, g.Y)) 
       .Subscribe(observer); 

      return new CompositeDisposable(aShared.Connect(), bShared.Connect()); 
     }); 
    } 
} 

// Define other methods and classes here 
public class CoordMetrics 
{ 
    internal CoordMetrics(int x, int y) 
    { 
     X = x; 
     Y = y; 
    } 
    internal int X { get; private set; } 
    internal int Y { get; private set; } 
} 

public class CoordData 
{ 
    internal CoordData(int x, int y) 
    { 
     X = x; 
     Y = y; 
    } 

    internal int X { get; private set; } 
    internal int Y { get; private set; } 
} 

public class CoordBundle 
{ 
    internal CoordBundle(int x, int y) 
    { 
     X = x; 
     Y = y; 
    } 

    internal int X { get; private set; } 
    internal int Y { get; private set; } 
} 
public class Pair 
{ 
    public Pair(CoordMetrics x, CoordData y) 
    { 
     Item1 = x; 
     Item2 = y; 
    } 
    public CoordMetrics Item1 { get; set; } 
    public CoordData Item2 { get; set; } 
} 
+0

这个工作非常完美,并且对于如何实际测试observables也非常有用。谢谢! – ket

相关问题