2017-05-17 33 views
1

我有以下Rx扩展方法用于分区IEnumerable <T>并延迟生成每个分区值。它使用IEnumerable <T>扩展来划分数据,这也用单元测试显示。分割可观察的IEnumerable流<T>延迟反应扩展

有没有比使用Observable.Timer()。Wait()方法调用更好的方式来执行'延迟'?

public static class RxExtensions 
{ 
    public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null) 
    { 
     if (scheduler == null) 
     { 
      scheduler = TaskPoolScheduler.Default; 
     } 

     var intervalEnabled = false; 
     return source.SelectMany(x => x.Partition(size).ToObservable()) 
      .Window(1) 
      .SelectMany(x => 
      { 
       if (!intervalEnabled) 
       { 
        intervalEnabled = true; 
       } 
       else 
       { 
        Observable.Timer(interval, TaskPoolScheduler.Default).Wait(); 
       } 

       return x; 
      }) 
      .ObserveOn(scheduler); 
    } 
} 

public static class EnumerableExtensions 
{ 
    public static IEnumerable<IEnumerable<T>> Partition<T>(this IEnumerable<T> source, int size) 
    { 
     using (var enumerator = source.GetEnumerator()) 
     { 
      var items = new List<T>(); 
      while (enumerator.MoveNext()) 
      { 
       items.Add(enumerator.Current); 
       if (items.Count == size) 
       { 
        yield return items.ToArray(); 

        items.Clear(); 
       } 
      } 

      if (items.Any()) 
      { 
       yield return items.ToArray(); 
      } 
     } 
    } 
} 

试验与Rx扩展方法如下:

static void Main(string[] args) 
{ 
    try 
    { 
     var data = Enumerable.Range(0, 10); 
     var interval = TimeSpan.FromSeconds(1); 

     Observable.Return(data) 
      .PartitionWithInterval(2, interval) 
      .Timestamp() 
      .Subscribe(x => 
       { 
        var message = $"{x.Timestamp} - count = {x.Value.Count()}, values - {x.Value.First()}, {x.Value.Last()}"; 
        Console.WriteLine(message); 
       }); 

      Console.ReadLine(); 
     } 
     catch (Exception e) 
     { 
      Console.WriteLine(e); 
     } 
} 

回答

1

这应做到:

public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null) 
{ 
    if (scheduler == null) 
    { 
     scheduler = TaskPoolScheduler.Default; 
    } 

    return source 
     //don't need the .ToObservable() call, since Zip can work on IEnumerable + IObservable. 
     .SelectMany(x => x.Partition(size)) 
     .Zip(Observable.Interval(interval, scheduler).StartWith(0), (x, _) => x) 
     .ObserveOn(scheduler); 
} 

滑稽PartitionWithInterval如何实际调用PartitionInterval

StartWith就在那里,所以你立即得到一个分区:类似于你有intervalEnabled标志。

0

感觉就像你必须使用运营商Buffer。试试这个:

  data.ToObservable() 
       .Buffer(2) 
       .Zip(Observable.Interval(interval), (x, _) => x) 
       .Timestamp() 
       .Subscribe(x => 
        { 
         var message = $"buffer {x.Timestamp} - count = {x.Value.Count()}, values - {x.Value.First()}, {x.Value.Last()}"; 
         Console.WriteLine(message); 
        });