2011-09-14 30 views
1

我正在使用无扩展的应用程序,并钻进了以下问题:问题与无扩展名观察员

说,我有两个观察者P和Q,我想建立第三个观察者[R,如果两个值P都是用Q,R输出0。而且如果在P来的Q-后,R输出这些值传递的方法的结果,是这样的:

P0 Q0 -> R0 = f(P0,Q0)  
P1   -> R1 = 0  
P2 Q1 -> R2 = f(P2,Q1)  
P3   -> R3 = 0  
P4   -> R4 = 0  
P5 Q2 -> R5 = f(P5,Q2) 
(...) 

和的值进入在以下的obsevers订购:

P0 Q0 P1 P2 Q1 P3 P4 P5 Q2

感谢您的帮助。

+0

那么如果一个Q在又一个Q&来自输入的 –

+0

永远不会出现两个Q – Ariel

+0

虽然您可能碰巧知道这一点,但您几乎可以肯定需要考虑在发生实施时应该如何处理。 –

回答

0

假设我们有两种方法

  1. 之前,合并两个每当第一个observable在第二个元素之前产生一个元素时,通过使用选择器函数将可观察序列合并为一个可观察序列。
  2. 没有,每次有两个项目从第一个可见项没有任何项目从第二个观察到时,将可观察序列合并到其他可观察序列。

用这种方法,问题几乎解决了。

IObservable<TP> P = // observer P 
IObservable<TQ> Q = // observer Q 

var PP = P.Without((prev, next) => 0, Q); 
var PQ = P.Before(Q, (p,q) => f(p,q)); // apply the function 

var ResultSecuence = PP.Merge(PQ); 

而且这里有两种方法

public static class Observer 
{ 
    /// <summary> 
    /// Merges two observable sequences into one observable sequence by using the selector function 
    /// whenever the first observable produces an element rigth before the second one. 
    /// </summary> 
    /// <param name="first"> First observable source.</param> 
    /// <param name="second">Second observable source.</param> 
    /// <param name="resultSelector">Function to invoke whenever the first observable produces an element rigth before the second one.</param> 
    /// <returns> 
    /// An observable sequence containing the result of combining elements of both sources 
    /// using the specified result selector function. 
    /// </returns> 
    public static IObservable<TResult> Before<TLeft, TRight, TResult>(this IObservable<TLeft> first, IObservable<TRight> second, Func<TLeft, TRight, TResult> resultSelector) 
    { 
     var result = new Subject<TResult>(); 

     bool firstCame = false; 
     TLeft lastLeft = default(TLeft); 

     first.Subscribe(item => 
     { 
      firstCame = true; 
      lastLeft = item; 
     }); 

     second.Subscribe(item => 
     { 
      if (firstCame) 
       result.OnNext(resultSelector(lastLeft, item)); 

      firstCame = false; 
     }); 

     return result; 
    } 

    /// <summary> 
    /// Merges an observable sequence into one observable sequence by using the selector function 
    /// every time two items came from <paramref name="first"/> without any item of any observable 
    /// in <paramref name="second"/> 
    /// </summary> 
    /// <param name="first"> Observable source to merge.</param> 
    /// <param name="second"> Observable list to ignore.</param> 
    /// <param name="resultSelector">Function to invoke whenever the first observable produces two elements without any of the observables in the secuence produces any element</param> 
    /// <returns> 
    /// An observable sequence containing the result of combining elements 
    /// using the specified result selector function. 
    /// </returns> 
    public static IObservable<TResult> Without<TLeft, TResult>(this IObservable<TLeft> first, Func<TLeft, TLeft, TResult> resultSelector,params IObservable<object>[] second) 
    { 
     var result = new Subject<TResult>(); 

     bool firstCame = false; 
     TLeft lastLeft = default(TLeft); 

     first.Subscribe(item => 
     { 
      if (firstCame) 
       result.OnNext(resultSelector(lastLeft, item)); 

      firstCame = true; 
      lastLeft = item; 
     }); 

     foreach (var observable in second) 
      observable.Subscribe(item => firstCame = false); 

     return result; 
    }   
} 
1

我想我有一个解决方案给你。

如果我假设你有如下定义:

IObservable<int> ps = ...; 
IObservable<int> qs = ...; 

Func<int, int, int> f = ...; 

一起来,我创建的功能字典来计算最终值:

var fs = new Dictionary<string, Func<int, int, int?>>() 
{ 
    { "pp", (x, y) => 0 }, 
    { "pq", (x, y) => f(x, y) }, 
    { "qp", (x, y) => null }, 
    { "qq", (x, y) => null }, 
}; 

的“P” &每个组合“ q“在那里。

然后您可以创建一个合并观察到的是这样的:

var pqs = 
    (from p in ps select new { k = "p", v = p }) 
     .Merge(from q in qs select new { k = "q", v = q }); 

我现在知道该序列产生哪些价值。

接下来,我发布了组合列表,因为我不知道源代码的观察对象是热的还是冷的 - 因此发布它们会使它们变得很热 - 然后我将已发布的可观察对象分别跳过一个和零。然后我知道他们来自的每一对价值观和原始观察者。应用字典功能很容易(过滤掉任何空值)。

这就是:

var rs = 
    from kvv in pqs.Publish(_pqs => 
     _pqs.Skip(1).Zip(_pqs, (pq1, pq0) => new 
     { 
      k = pq0.k + pq1.k, 
      v1 = pq1.v, 
      v0 = pq0.v 
     })) 
    let r = fs[kvv.k](kvv.v0, kvv.v1) 
    where r.HasValue 
    select r.Value; 

这是否对你的工作?

+0

_pqs.Skip(1).Zip(_pqs)将导致重复的值被处理。例如。你有ps = {p1,p2,p3}和qs = {q1,q2,q3},你会得到pqs = {p1,q1,p2,q2,p3,q3},然后{{p1,q1}, {q1,p2},{p2,q2},...}产生比所需更多的值。 –

+0

@Konstantin - 我的理解是,这是必需的行为。我已经插入了输入序列 - “P0,Q0,P1,P2,Q1,P3,P4,P5,Q2”,并获得了所需的输出序列“f(0,0),0,f(2, 1),0,0,f(5,2)“。我错过了什么? – Enigmativity

+0

这是因为你只是跳过“qp”和“qq”的值,但是后者应该返回0(参见问题的评论),即使可以忽略“qp”(这看起来不可能),它看起来有点古怪介绍假货。 –

1

总的想法很简单:你合并P和Q,使用BufferWithCount(2)根据你的逻辑得到的值对,然后流程对:

 

P.Merge(Q).BufferWithCount(2).Select(values => 
{ 
    var first = values[0]; 
    var second = values[1]; 
    if (first is P && second is P || 
     first is Q && second is Q) 
    { 
     return 0; 
    } 

    if (first is P) 
    { 
     return selector(first, second); 
    } 
    else // suppose Q, P is a valid sequence as well. 
    { 
     return selector(second, first); 
    } 
}); 
 

现在最困难的部分是合并P和Q如果他们是不同类型,然后在Select中区分它们。如果它们是同一类型的,你可以使用一些简单的像由Enigmativity提出的方法,即

 

var pqs = 
    (from p in ps select new { k = "p", v = p }) 
     .Merge(from q in qs select new { k = "q", v = q }); 
 

现在最困难的部分是,如果他们是不同类型的,合并它们,我们需要一些常见的包装类型,像,例如Data.Either从哈斯克尔:

 

public abstract class Either<TLeft, TRight> 
{ 
    private Either() 
    { 
    } 

    public static Either<TLeft, TRight> Create(TLeft value) 
    { 
     return new Left(value); 
    } 

    public static Either<TLeft, TRight> Create(TRight value) 
    { 
     return new Right(value); 
    } 

    public abstract TResult Match<TResult>(
     Func<TLeft, TResult> onLeft, 
     Func<TRight, TResult> onRight); 

    public sealed class Left : Either<TLeft, TRight> 
    { 
     public Left(TLeft value) 
     { 
      this.Value = value; 
     } 

     public TLeft Value 
     { 
      get; 
      private set; 
     } 

     public override TResult Match<TResult>(
      Func<TLeft, TResult> onLeft, 
      Func<TRight, TResult> onRight) 
     { 
      return onLeft(this.Value); 
     } 
    } 

    public sealed class Right : Either<TLeft, TRight> 
    { 
     public Right(TRight value) 
     { 
      this.Value = value; 
     } 

     public TRight Value 
     { 
      get; 
      private set; 
     } 

     public override TResult Match<TResult>(
      Func<TLeft, TResult> onLeft, 
      Func<TRight, TResult> onRight) 
     { 
      return onRight(this.Value); 
     } 
    } 
} 
 

滑稽的是,已经有在System.Reactive.dll类似无论阶级,不幸的是它的内部,所以我们需要我们自己的实现。现在,我们可以将两个P和Q成要么与出液进行(我概括了一点,所以你只能返回任何结果,而不是为int):

 

public static IObservable<TResult> SmartZip<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector) 
{ 
    return Observable 
     .Merge(
      leftSource.Select(Either<TLeft, TRight>.Create), 
      rightSource.Select(Either<TLeft, TRight>.Create)) 
     .BufferWithCount(2) 
     .Select(values => 
      { 
       // this case was not covered in your question, 
       // but I've added it for the sake of completeness. 
       if (values.Count < 2) 
       { 
        return default(TResult); 
       } 

       var first = values[0]; 
       var second = values[1]; 

       // pattern-matching in C# is really ugly. 
       return first.Match(
        left => second.Match(
         _ => default(TResult), 
         right => selector(left, right)), 
        right => second.Match(
         left => selector(left, right), 
         _ => default(TResult))); 
      }); 
} 
 

这里是一个小的演示所有这些可怕的丑陋的东西。

 

private static void Main(string[] args) 
{ 
    var psource = Observable 
     .Generate(1, i => i < 100, i => i, i => i + 1) 
     .Zip(Observable.Interval(TimeSpan.FromMilliseconds(10.0)), (i, _) => i); 
    var qsource = Observable 
     .Generate(1, i => i < 100, i => (double)i * i, i => i + 1) 
     .Zip(Observable.Interval(TimeSpan.FromMilliseconds(30.0)), (i, _) => i); 

    var result = SmartZip(
     psource, 
     qsource, 
     (p, q) => q/p).ToEnumerable(); 
    foreach (var item in result) 
    { 
     Console.WriteLine(item); 
    } 
} 
 
0

如果我有正确理解那么下面的问题是一个通用的功能,它可以处理这样的情况:

public static IObservable<T> MyCombiner<T>(IObservable<T> P, IObservable<T> Q, T defaultValue,Func<T,T,T> fun) 
     { 
      var c = P.Select(p => new { Type = 'P', Value = p }) 
         .Merge(Q.Select(p => new { Type = 'Q', Value = p })); 
      return c.Zip(c.Skip(1), (a, b) => 
      { 
       if (a.Type == 'P' && b.Type == 'P') 
        return new { Ok = true, Value = defaultValue }; 
       if (a.Type == 'P' && b.Type == 'Q') 
        return new { Ok = true, Value = fun(a.Value, b.Value) }; 
       else 
        return new { Ok = false, Value = default(T) }; 
      }).Where(b => b.Ok).Select(b => b.Value); 

     }