2013-10-05 30 views
5

我开始使用反应式扩展(版本2.1,以防万一)进行开发,对于我的示例应用程序,我需要一系列按某些间隔推送的int值,即每1秒。如何使用Rx(反应性扩展)粘贴相对延迟

我知道,我可以用Observable.Range<int>(0,10)创建一个序列,但我无法弄清楚如何设置推送之间的相对时间。我试过Delay(),但是在开始时只将序列移动一次。

我随后发现Observable.Generate()方法,可以进行调整,以在明年方式这一任务:

var delayed = Observable. 
       Generate(0, i => i <= 10, i => i + 1, i => i, 
          i => TimeSpan.FromSeconds(1)); 

不过,这似乎是“换每个般”的定义序列的简单唯一的工作。 所以,一般来说,我的问题是,我们是否可以获得任何源序列,并用一些代理来包装它,这些代理将从源中获取消息并延时推进?

S--d1--d2--d3--d4--d5-| 
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-| 

P.S.如果这种方法与ReactiveExtensions的概念相矛盾,请注意这一点。我不想“无论如何”这样做,他们将来也会遇到其他一些设计问题。

P.P.S总体思路是,以确保输出序列具有事件尽管如果输入序列是有限的还是无限的,以及它推动事件之间指定的时间间隔。

回答

7

Observable.Interval是你想要看的。它会生成一个基于0的长值由1所指定例如为:

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x)); 

然后可以使用投影(Select),以抵消/根据需要修改这个值的时候每个间隔递增。

你也可以通过使用Zip操作符来“调整”另一个流 - 你可能也想看看这个。 Zip将来自两个流的事件组合在一起,因此它以当前最慢的流的速度发射。 Zip也非常灵活,它可以压缩任意数量的流,甚至可以将IObservable压缩到IEnumerable。下面是一个例子:

var pets = new List<string> { "Dog", "Cat", "Elephant" }; 
var pace = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Zip(pets, (n, p) => p)  
    .Subscribe(x => Console.WriteLine(x),() => Console.WriteLine("Done")); 

这会以1秒的时间间隔写出宠物。

根据P.P.S.在上面加上,我会给出另一个答案 - 我会留下这个参考,因为无论如何这是一个有用的技术。

+1

感谢您的回答。 Zip()的破解工作几乎没有问题。这里的一个缺陷是,如果“输入”序列在时间上是无限且不可预知的(即发生按钮点击)。有了它,如果按钮没有被点击 - 比如说10秒 - “输入”中没有事件,但是“计时器”序列产生了10个事件。然后,如果我开始快速点击按钮,所有来自'输入'的新事件都会与来自'timer'的现有事件成对压缩,因此'结果'序列也将以'输入'的推送速度拉动,这不是我想。 – Antonio

+0

我会在'timer'中添加如下检查:_on新事件't_i',检查'input'中是否有事件,如果没有't_i',则检查't_i'。 – Antonio

+0

关于点击一个按钮的位是所有新闻给我:)这不是在你的问题。也许你可以纯粹根据涉及的实际数据和期望的用户体验来重新说明问题? –

2

因此,为了澄清,您希望输出以不超过间隔的速率推动输入,但其他方式尽可能快。

在这种情况下,试试这个。变化的构造是一种奇怪的方式来创建一个短暂的零星序列,有时更快,有时比2秒的步伐慢。请注意,秒表的输出将显示Rx使用的定时器机制中的小错误。

var input = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4); 
input = input.Concat(Observable.Interval(TimeSpan.FromSeconds(5)).Take(2)); 

var interval = TimeSpan.FromSeconds(2); 

var paced = input.Select(i => Observable.Empty<long>() 
             .Delay(interval) 
             .StartWith(i)).Concat(); 

var stopwatch = new Stopwatch(); 
stopwatch.Start(); 
paced.Subscribe(
    x => Console.WriteLine(x + " " + stopwatch.ElapsedMilliseconds), 
    () => Console.WriteLine("Done")); 

此示例的工作原理是突出从输入每个刻度到具有刻度作为在开始的单个事件的序列,但不为的onComplete所需的间隔。然后将所得的流串联起来。这种方法可以确保当前输出结果是“刷新”时立即发出新的报价,但是否则会缓冲到另一个之后。

你可以用扩展方法来包装它,以使其通用。

0

下面就做你想做的最简单的方法:

var delayed = 
    source.Do(x => Thread.Sleep(1000)); 

它增加了第二个延迟,但它的第一个项目之前这样做。你当然可以总结一些逻辑,不要在开始时拖延。这不会太难。


下面是一个替代方案,用于安排全新趋势的延迟。

var delayed = 
    Observable.Create<int>(o => 
    { 
     var els = new EventLoopScheduler(); 
     return source 
      .ObserveOn(els) 
      .Do(x => els.Schedule(() => Thread.Sleep(1000))) 
      .Subscribe(o); 
    }); 
+0

我不得不说,我不是这种方法的粉丝有几个原因。它没有单元测试 - 与Delay不同,您没有机会参与等待调度程序。它也会不必要地阻塞线程。它还使用Do,它按用户运行,因此您可以为意想不到的副作用(想象下游运营商进行双重订阅)提供机会。最后,作为上述结果,这不是惯用的Rx –

+0

@JamesWorld - 是的,我同意我的第一个选择不是最好的。第二个比较好,因为它不会阻塞任何现有的线程。 – Enigmativity

0

也许你正在寻找的是Buffer扩展方法。它的签名的定义如下:

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source, 
    TimeSpan timeSpan) 

它将在该值在批尽可能经常timeSpan产生的方式变换源序列。

0

我知道这是一个老问题,但我想我有正确的答案。

即使源Observable没有产生任何东西,压缩Observable.Timer也会产生'ticks'。这意味着一旦源生成另一个项目,将使用已经生成但尚未消耗的任何蜱。当生产者以稳定的速度生产物品时,导致物品之间会产生延迟,但如果生产者有时需要较长时间才能生产物品,则会产生物品爆发。

为了避免这种情况,您需要生成一个只能生成一个项目的计时器,该计时器在由可观察项生成的每个项目之间。你可以用Observable.Switch这样做:

var subject = new Subject<Unit>(); 

     var producer = subject.SelectMany(
            _ => 
            { 
             return new[] 
             { 
              Observable.Return(true), 
              Observable.Timer(TimeSpan.FromSeconds(2)) 
                .Select(q => false) 
             }; 
            }) 
           .Switch();