2013-03-20 124 views
0

我需要反弹输入流。Rx反弹输入

在状态1的第一次出现时,我需要等待5秒钟并确认laste状态是否也为1. 只有比我有稳定的信号。

(time) 0-1-2-3-4-5-6-7-8-9 
(state) 0-0-0-0-0-1-0-1-0-1 
(result)     -> 1 

这里是一个非稳定信号的例子。

(time) 0-1-2-3-4-5-6-7-8-9 
(state) 0-0-0-0-0-1-0-1-0-0 
(result)     -> 0 

我试图用一个缓冲区,但缓冲区有固定的出发点,我需要等待开始我的第一个事件5秒。

+0

如果示例2中的下一个状态为0-1,您会希望结果如何? – 2013-03-20 23:41:41

+0

每秒产生一个输出吗?我同意下面的JerKilmball,一些关于你的用例的更多信息会有帮助 – AlexFoxGill 2013-03-21 08:57:21

回答

3

以你的要求的字面

在状态1的第一次出现,我需要等待5秒钟, 验证是否乐特状态也1.只比我有一份稳定的 信号。

我可以想出几种方法来解决这个问题。 为了澄清我的假设,您只需要推出第一次出现1次后5秒产生的最后一个值。这将导致产生0或1的单个值序列(即,不管过去产生的任何其他值距离源序列5秒)

这里我用一些jiggery-pokery重新创建序列。

var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1)) 
    .Take(10) 
    .Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1; 
    //.Select(i=>{if(i==5 || i==7){return 1;}else{return 0;}}); //Should produce 0; 

下面的所有选项看起来共享序列。要在Rx中安全地共享一个序列,我们发布()并连接它。我通过RefCount()运算符使用自动连接。

var sharedSource = source.Publish().RefCount(); 

1)在此解决方案中,我们取第一个值1,然后将选定的序列值缓冲到5秒的缓冲区大小。我们只取这些缓冲区中的第一个。一旦我们得到这个缓冲区,我们得到最后一个值并推送它。如果缓冲区是空的,我假定我们推动一个,因为最后一个值是'1',启动缓冲区运行。

sharedSource.Where(state=>state==1) 
      .Take(1) 
      .SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1)) 
      .Select(buffer=> 
      { 
       if(buffer.Any()) 
       { 
        return buffer.Last(); 
       } 
       else{ 
        return 1; 
       } 
      }) 
      .Dump(); 

2)在这个解决方案,我采取的方法只有一次,我们得到一个有效的值(1),开始听,然后直到定时器触发终止采取一切值。从这里我们拿出最后一个值。

var fromFirstValid = sharedSource.SkipWhile(state=>state==0); 
fromFirstValid 
    .TakeUntil(
     fromFirstValid.Take(1) 
        .SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5)))) 
    .TakeLast(1) 
    .Dump(); 

3)在该溶液I使用窗口操作者创建时的“1”的第一个值发生打开一个窗口,然后关闭时5秒为止。我们再次取最后一个值

sharedSource.Window(
       sharedSource.Where(state=>state==1), 
       _=>Observable.Timer(TimeSpan.FromSeconds(5))) 
      .SelectMany(window=>window.TakeLast(1)) 
      .Take(1) 
      .Dump(); 

所以很多不同的方法来皮肤猫。

+0

+1 - 很好的答案!你知道,我粘贴一个链接到你的文章窗口/加入/等约三次血腥的一周... :) – JerKimball 2013-03-22 18:05:21

+0

呜呼!我非常激动,它为你提供了价值。 – 2013-03-22 18:06:21

+0

太棒了!我使用了解决方案2.如果您每天都不使用Rx,那么很难将您的大脑切换到Rx模式。谢谢! – 2013-04-08 18:43:14

1

这听起来(一览)像你想Throttle,不Buffer,虽然您的使用情况下,一些详细信息,将有助于引脚下来 - 无论如何,这里是你会如何Throttle您的直播

void Main() 
{ 
    var subject = new Subject<int>(); 
    var source = subject.Publish().RefCount(); 

    var query = source 
     // Start counting on a 1, wait 5 seconds, and take the last value 
     .Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5))); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     // This sequence should produce a one 
     subject.OnNext(1); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(1); 
     Console.ReadLine(); 
     // This sequence should produce a zero 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(0); 
     Console.ReadLine(); 
    } 
}