2016-06-14 113 views
0

使用Observable.Buffer(TimeSpan timeSpan) Method,其将流进10分钟块及申报IList,工作正常的Rx - 开始缓冲onNext

var stream = Observable.FromEventPattern<*>(*,*); 
stream.Buffer(TimeSpan.FromSeconds(10)); 

试图实现更复杂的行为

  • 开始区块(缓冲的事件列表)当新事件被推入流(而不是每10秒)
  • 继续,直到任何情况下被推到流为x
+0

不确定这里有什么问题 – Mathieu

回答

0

缓冲事件试试这个:

var query = 
    stream.Publish(
     ps => ps.Window(
      () => ps.Delay(TimeSpan.FromSeconds(1.0)).Take(1))); 
0

我相信有相当你能做到这一点的几种方式。

我这里有一个工作测试例子

void Main() 
{ 
    var scheduler = new TestScheduler(); 
    var stream = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(1.Seconds(), 'A'), 
     ReactiveTest.OnNext(2.Seconds(), 'B'), 
     ReactiveTest.OnNext(13.Seconds(), 'C') 
     ); 

    var observer = scheduler.CreateObserver<string>(); 

    var query = stream.Publish(s => { 
     return s.Timeout(TimeSpan.FromSeconds(10), Observable.Empty<char>(), scheduler) 
      .ToList() 
      .Where(buffer=>buffer.Any())  
      //Project to string to make equality test easier for the example.  
      .Select(buffer=>string.Join(",", buffer)) 
      .Repeat(); 
    }); 

    query.Subscribe(observer); 

    scheduler.AdvanceBy(100.Seconds()); 

    ReactiveAssert.AreElementsEqual(
     new []{ 
      ReactiveTest.OnNext(12.Seconds(), "A,B"), 
      ReactiveTest.OnNext(23.Seconds(), "C") 
     }, 
     observer.Messages); 
} 

// Define other methods and classes here 
public static class TimeEx 
{ 
    public static long Seconds(this int seconds) 
    { 
     return TimeSpan.FromSeconds(seconds).Ticks; 
    } 
} 

注意在这里,我只是做了缓冲列表的字符串,使其更容易验证的平等。即"A,B"代替{'A', 'B'}

其他可供考虑的方案是WindowGroupJoin运营商能够做到这一点 - 看到http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html。我确信其他运营商可以像SwitchSelectTimer,Timeout等缝合在一起,以获得您的结果。

相关问题