2012-03-18 71 views
25

我想给RX查询是不平凡的模型(对我来说):加入的Rx流

  • 在一个房间里有男人和女人。
  • 他们进入和离开房间,而在房间里有时他们改变他们的位置。
  • 每个男人可以在给定的时间看一个(或零)女人。
  • 每个人具有以下属性:

    class Man 
    { 
        public const int LookingAtNobody = 0; 
        public int Id { get; set; } 
        public double Location { get; set; } 
        public int LookingAt { get; set; } 
    } 
    
  • 每个女人具有以下属性:

    class Woman 
    { 
        public int Id { get; set; } 
        public double Location { get; set; } 
    } 
    
  • 为了表示我们有IObservable<IObservable<Man>>男子,并表示我们有IObservable<IObservable<Woman>>妇女。

你将如何使用的Rx产生从男人到女人的载体,他们正在寻找:IObservable<IObservable<Tuple<double,double>>>

帮助,这里有一些简单的情况下,一些单元测试:

public class Tests : ReactiveTest 
{ 
    [Test] 
    public void Puzzle1() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(300)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle2() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(350), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle3() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle4() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }), 
      OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }), 
      OnCompleted<Man>(500)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 
     var w2 = scheduler.CreateHotObservable(
      OnNext(155, new Woman { Id = 20, Location = 100.0 }), 
      OnNext(255, new Woman { Id = 20, Location = 200.0 }), 
      OnNext(355, new Woman { Id = 20, Location = 300.0 }), 
      OnCompleted<Woman>(455)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     var expectedVector2 = new[] 
         { 
          OnNext(300, Tuple.Create(3.0, 200.0)), 
          OnNext(355, Tuple.Create(3.0, 300.0)), 
          OnNext(400, Tuple.Create(4.0, 300.0)), 
          OnCompleted<Tuple<double,double>>(455), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
     ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]); 
    } 

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men) 
    { 
     // assuming nested sequences are hot 
     var vectors = 
      from manDuration in men 
      join womanDuration in women on manDuration equals womanDuration 
      select from man in manDuration 
        join woman in womanDuration on manDuration equals womanDuration 
        where man.LookingAt == woman.Id 
        select Tuple.Create(man.Location, woman.Location); 

     var query = vectors.Select(vectorDuration => 
     { 
      var vectorResults = scheduler.CreateObserver<Tuple<double, double>>(); 
      vectorDuration.Subscribe(vectorResults); 
      return vectorResults.Messages; 
     }); 

     var results = scheduler.Start(() => query, 0, 0, 1000).Messages; 
     return results; 
    } 
} 

(注:这个问题是交叉贴到Rx论坛:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe

+4

DAT IEnumerable <记录<通知>>>>>> – Asti 2012-03-19 08:54:44

+0

您发布到MSDN论坛的帖子以及创建的喋喋不休的数量证明这不是一件好事问答网站问题。 – 2013-03-27 23:14:35

+9

“不要过河” - Egon Spengler博士 – 2013-10-11 16:40:28

回答

1

如果我理解正确,目标是创建一个“跟随观察者”的观察点,其中一个“跟踪观察者”从一个男人开始注视一个女人开始,到该男人停止注视该女人时结束。 “follow observable”应该由男人和女人的最近位置的元组组成。

这里的想法是使用CombineLatest,它将带有两个可观测值,当它们中的任何一个产生一个值时,组合器将针对观测值的两个最近值进行评估,这会在组合观测值中产生一个值。但是,CombineLatest只有在两个可观测量都完成时才会完成。在这种情况下,我们希望在两个源中的任何一个完成时完成观察值。为了做到这一点,我们定义了下面的扩展方法(我不相信这样的方法已经存在,但也有可能是一个简单的解决方案):

public static IObservable<TSource> 
    UntilCompleted<TSource, TWhile>(this IObservable<TSource> source, 
             IObservable<TWhile> lifetime) 
{ 
    return Observable.Create<TSource>(observer => 
    { 
    var subscription = source.Subscribe(observer); 
    var limiter = lifetime.Subscribe(next => { },() => 
    { 
     subscription.Dispose(); 
     observer.OnCompleted(); 
    }); 
    return new CompositeDisposable(subscription, limiter); 
    }); 
} 

,此方法类似于TakeUntil,但它代替直到lifetime产生一个值,直到lifetime完成。我们也可以定义一个简单的扩展方法,它满足的谓词的第一条痕:

public static IObservable<TSource> 
    Streak<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate); 
} 

现在对于最终的查询,我们将所有的人使用CombineLatest所有妇女,并完成可观察使用早期UntilCompleted。为了得到“遵守观察结果”,我们选择男人正在看着女人的连胜。然后我们简单地将它映射到一个位置元组。

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    select manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streak(pair => pair.Man.LookingAt == pair.Woman.Id) 
    .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location)); 

这通过所有的测试,但其中一人在看一会儿女人10,然后在20一会儿,然后在10再次有一段时间不处理的情况;仅使用第一个条纹。遵守所有的条纹,我们可以使用下面的扩展方法,它返回一个可观察的条纹:

public static IObservable<IObservable<TSource>> 
    Streaks<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return Observable.Create<IObservable<TSource>>(observer => 
    { 
    ReplaySubject<TSource> subject = null; 
    bool previous = false; 
    return source.Subscribe(x => 
    { 
     bool current = predicate(x); 
     if (!previous && current) 
     { 
     subject = new ReplaySubject<TSource>(); 
     observer.OnNext(subject); 
     } 
     if (previous && !current) subject.OnCompleted(); 
     if (current) subject.OnNext(x); 
     previous = current; 
    },() => 
    { 
     if (subject != null) subject.OnCompleted(); 
     observer.OnCompleted(); 
    }); 
    }); 
} 

通过订阅只有一次的源流,并通过使用ReplaySubject,这种方法适用于热以及冷观察。现在对于最终的查询,我们选择所有条纹如下:

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    from streak in manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id) 
    select streak.Select(pair => 
    Tuple.Create(pair.Man.Location, pair.Woman.Location)); 
0

我不知道我理解你为什么建模的男人和女人这两个位置的流作为IObservable<IObservable<T>>,而不仅仅是一个IObservable<T>,但是这可能工作:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                      IObservable<IObservable<Woman>> womenObservable) 
{ 
    return Observable.CombineLatest(
     menObservable.Switch(), 
     womenObservable.Switch(), 
     (man, woman) => new {man, woman}) 
      .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id) 
      .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location)); 
} 

时将其压开关基本上是“开关”新观察到的,这拉平流。 where和select是相当简单的。

我有一个偷偷摸摸的怀疑我误解了需求的一些东西,但我想我会提交我的答案,以防万一它有帮助。