我有以下Rx扩展方法用于分区IEnumerable <T>并延迟生成每个分区值。它使用IEnumerable <T>扩展来划分数据,这也用单元测试显示。分割可观察的IEnumerable流<T>延迟反应扩展
有没有比使用Observable.Timer()。Wait()方法调用更好的方式来执行'延迟'?
public static class RxExtensions
{
public static IObservable<IEnumerable<T>> PartitionWithInterval<T>(this IObservable<IEnumerable<T>> source, int size, TimeSpan interval, IScheduler scheduler = null)
{
if (scheduler == null)
{
scheduler = TaskPoolScheduler.Default;
}
var intervalEnabled = false;
return source.SelectMany(x => x.Partition(size).ToObservable())
.Window(1)
.SelectMany(x =>
{
if (!intervalEnabled)
{
intervalEnabled = true;
}
else
{
Observable.Timer(interval, TaskPoolScheduler.Default).Wait();
}
return x;
})
.ObserveOn(scheduler);
}
}
public static class EnumerableExtensions
{
public static IEnumerable<IEnumerable<T>> Partition<T>(this IEnumerable<T> source, int size)
{
using (var enumerator = source.GetEnumerator())
{
var items = new List<T>();
while (enumerator.MoveNext())
{
items.Add(enumerator.Current);
if (items.Count == size)
{
yield return items.ToArray();
items.Clear();
}
}
if (items.Any())
{
yield return items.ToArray();
}
}
}
}
试验与Rx扩展方法如下:
static void Main(string[] args)
{
try
{
var data = Enumerable.Range(0, 10);
var interval = TimeSpan.FromSeconds(1);
Observable.Return(data)
.PartitionWithInterval(2, interval)
.Timestamp()
.Subscribe(x =>
{
var message = $"{x.Timestamp} - count = {x.Value.Count()}, values - {x.Value.First()}, {x.Value.Last()}";
Console.WriteLine(message);
});
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}