2013-04-09 61 views
1

我喜欢Rx,但我遇到了一个问题,我一直在遇到。假设我们有一个上游IObservable<Foo>N下游顺序序列,其中每个序列只对满足一些简单谓词(比如foo.bar == someKey)的Foos感兴趣。无功扩展:Where()的问题

当然这是针对Where()操作一个简单的工作:

IObservable<Foo> foos = ...; 
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f)); 
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f)); 
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f)); 
... 
[many more subscriptions for different bar values] 

什么将主要发生在这里的是,对上游生产的每个Foo,该Where()谓词将为该FooN次评估。它像线性搜索一样查找所有需要此订户的订户Foo。这一切都很好,正是我们(应该)期待在这里使用Where()

我的问题是,在我的情况下,N可能非常大,但想要任何特定Foo的用户子集非常小。通常,每个Foo将只有一个。这意味着我本质上是做一个缓慢的线性搜索,当我可以做一个非常有效的查找来找到这个Foo需要传播的少数下游序列。我的应用程序运行在非常关键的性能环境中,我无法承受这种低效率。

我已经绞尽脑汁想要找到一些更高效的优化方法,但我只能想出一些解决方案,这些解决方案涉及存储大量状态(映射订户等),并且必须非常小心地管理并发,这首先破坏了很多使用Rx的目的。我希望以现有的运营商的角度来处理这个问题。有没有人处理过这个问题,或知道一个好的解决方案?我很高兴提供更多细节。

编辑

我想我的例子是有点过于简单化。我没有处理与某些已知边界内的数值匹配的情况。 N仅用于说明目的。上面的更新示例。

+0

你会对所有可能的bar 0..N值,还是仅仅为某些? – 2013-04-09 17:54:49

+0

最好的可能是保持你的foos排序顺序。看看'List.BinarySearch',然后只是迭代调用'Subscribe'直到'foo.Bar> = N' – 2013-04-09 18:16:41

+0

对不起,我的例子太简单了,请参阅编辑和新示例代码。 – Tim 2013-04-09 19:14:26

回答

3

得到了戴夫·塞克斯顿了在Rx讨论板Codeplex上一个很好的解决方案:

https://rx.codeplex.com/discussions/439717

有关使用的GroupByGroupByUntil发布如何?

例如:(未经测试

IConnectableObservable<IGroupedObservable<string, Foo>> foosByBar = 
    (from foo in foos 
    group foo by foo.bar) 
    .Publish(); 

foosByBar.Where(g => g.Key == "abc").Take(1).SelectMany(g => g).Subscribe(A); 
foosByBar.Where(g => g.Key == "xyz").Take(1).SelectMany(g => g).Subscribe(B); 
foosByBar.Where(g => g.Key == "bla").Take(1).SelectMany(g => g).Subscribe(C); 

foosByBar.Connect(); 

的GroupBy使用词典查找每键以找到适当的可观察到的,其中值被推动。

发布广播分组,以便字典查找操作被所有观察者共享。

/采取执行谓词只有一次找到相应的组,然后收到该 组的每个值的广播与兴趣相同的密钥的任何其他观察员一起。

注意的GroupBy不重播IGroupedObservable所以你 必须设置所有订阅的连接之前。如果您想 而使用引用次数连接,那么也许你应该考虑 应用重播运营商的 结果的GroupBy

+0

你为什么不将他们在那里找到的解决方案发布到这个答案中? – 2013-04-10 16:06:11

+0

我认为从源头获取信息会更好,但无论如何CodePlex在某一天爆炸时我都会这样做。 – Tim 2013-04-10 18:08:19

0

有些东西正在存储状态,现在它只是可见的存储您通过Wheres添加的所有订阅者。目前还不清楚你是否意识到这一点,但foos必须通知每个观察员每条消息。所有Where所做的就是让大多数观察者只是检查谓词并返回,但是每个消息都会检查每个谓词。

构建一个包含作为观察者的处理程序的映射不会太困难,应该为您提供所需的性能收益。只需注册尽可能多的处理程序,然后将地图订阅到源观察点即可。如果一个Dictionary没有提供你需要的匹配语义,你将不得不提出一些其他的方案来减少查找,但总体思路是一样的。请注意,如果它有多个它应该处理的输入,您可以多次注册相同的处理程序,并且您可以为相同的输入注册多个处理程序。

class ObserverMap<T> : IObserver<T> 
{ 
    ObserverMap(Action<Exception> onError, Action onCompleted) 
    { 
     _onError = onError; 
     _onCompleted = onCompleted; 
     _handlers = new Dictionary<T, List<Action<T>>>(); 
    } 
    ObserverMap(Action<Exception> onError, Action onCompleted, IEqualityComparer<T> comparer) 
    { 
     _onError = onError; 
     _onCompleted = onCompleted; 
     _handlers = new Dictionary<T, List<Action<T>>>(comparer); 
    } 

    int _stopped; 
    Dictionary<T, List<Action<T>>> _handlers; 
    Action<Exception> _onError; 
    Action _onCompleted; 

    public void OnCompleted() 
    { 
     if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0) 
     { 
      if (_onCompleted != null) _onCompleted(); 
     } 
    } 

    public void OnError(Exception error) 
    { 
     if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0) 
     { 
      if (_onCompleted != null) _onCompleted(); 
     } 
    } 

    public void OnNext(T value) 
    { 
     if (_stopped != 0) return; 

     List<Action<T>> match; 
     if (_handlers.TryGetValue(value, out match)) 
     { 
      foreach (var handler in match) 
      { 
       handler(value); 
      } 
     } 
    } 

    public IDisposable RegisterHandler(T key, Action<T> handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 

     List<Action<T>> match; 
     if (!_handlers.TryGetValue(key, out match)) 
     { 
      match = new List<Action<T>>(); 
      _handlers.Add(key, match); 
     } 
     match.Add(handler); 

     return System.Reactive.Disposables.Disposable.Create(() => match.Remove(handler)); 
    } 
} 
+0

我会建议,而不是写一个签名'IDisposable RegisterHandler(T键,动作处理程序)'用于添加订阅者的方法,写一个索引器属性/方法的签名更像'IObservable this [T key] {得到{return Observable.Create(...); }}。 – 2013-04-09 22:58:03

+0

是的,我知道状态正在被存储和每个oberver的通知(这是我描述线性搜索时的意思)。 Rx虽然整齐地把它弄干净了,我宁愿自己也不必写这种代码。我确实有类似于你所描述的解决方案,但我希望有一种更清洁的“内置”方式。 – Tim 2013-04-09 23:21:30