2016-12-31 89 views
0

我正在访问属于不同计算过程的内存区域。 该地区的变化相对较少,我需要在发生变化时进行计算。我收到有关更改的通知,但我需要稍等一下,以确保不再进行更改。我像这个模型:如果有新物品到达可观察物体,我可以检查观察者吗?

var readyToBeProcessed = changed 
    .Select(x => DateTime.Now) 
    .Throttle(TimeSpan.FromSeconds(5)); 

但是我的计算需要相当一段时间,有可能是内存的变化,而我做他们。在这种情况下,我需要将这一轮计算标记为无效。

但是我怎么知道我的观察,当我完成了计算,如果另一个事件到达与否,在处理当前事件?如果自开始计算以来没有事件到达,那么它是有效的,我可以存储结果。

在实践中,这是非常罕见,该事件中,使计算变得无效的模式(速度不够快)到达,我还是想以应付这种情况。

注:我知道,我不能保证始终有效的计算。记忆变化和我收到事件之间有一小段时间。这是完全可能的,序列是这样的1)我正在做计算2)内存变化3)我完成计算并检查事件,并确定计算有效4)内存变化事件到达。我很高兴与此住了,现在

readyToBeProcessed.Subscribe(x => 
{ 
    Log.Info("Start work..."); 
    // Do calculation here 
    ... 
    // When finished 
    if (Is there a new item) 
    { 
     Log.Info("There were changes while we worked... Invalidating"); 
     Invalidate(); 
    } 
    else 
    { 
     Log.Info("Succeeded"); 
    } 
}, cancellationToken); 

是反应坏适合这项任务?

回答

1

的Rx实际上是一个伟大的选择,在这里,我觉得,虽然你可能需要多一点明确建模。

想想事实上有五种类型的事件:项目的变化,做 - 工作开始,做 - 工作结束,失效,成功(我希望我可以使用更好的名字,但我正在努力什么你写了)。

这里是他们是如何工作的一个大理石图:

t(sec)  : 0--1--2--3--4--5--6--7--8--9--10-11-12-13-14-15-16... 
item-change : *-*--**-----------------*-------------------------... 
do-Work-begins: ---------------------*-----------------*----------... 
do-Work-ends : -------------------------*------------------*-----... 
invalidate : -------------------------*------------------------... 
succeeded  : --------------------------------------------*-----... 

我们开始工作,一旦出现了项目变化5秒平静。如果在工作时间内有任何变化,我们希望在工作完成时失效。如果不是,我们希望观察成功。

var doWorkBegins = changed 
    .Select(x => DateTime.Now) 
    .Throttle(TimeSpan.FromSeconds(5)); 

var doWorkEnds = doWorkBegins 
    .SelectMany(x => 
    { 
     Log.Info("Start work..."); 
     // DoWork(); 
     // 
     // should return an observable that returns a single value when complete. 
     // If DoWork is just a void, then can use 
     // return Observable.Return(Unit.Default); 
    }); 

var lists = changed 
    .Buffer(() => doWorkEnds) 
    .Publish().RefCount(); 

var succeeded = lists 
    .Where(l => l.Count == 0); 

var invalidate = lists 
    .Where(l => l.Count > 0); 

invalidate.Subscribe(x => 
{ 
     Log.Info("There were changes while we worked... Invalidating"); 
     Invalidate(); 
}, cancellationToken); 

succeeded.Subscribe(x => 
{ 
    Log.Info("Succeeded"); 
}, cancellationToken); 
1

理想情况下,我会建议你使用一个Task保持你的工作轨道,那么你可以使用:

readyToBeProcessed 
.Select(evt => Observable.StartAsync<Unit>(async (cancellationToken) => 
{   
    //pass cancellationToken to work 
    var result = await DoWork(cancellationToken); 
    //test token if needed 
    return result; 
})) 
.Switch() 
.Subscribe(); 

当下次邮件到达,当前令牌将被取消。

+1

'Switch'是这里的关键。您将计算结果“投影”到另一个异步值(a.k.a未来,任务或一个值的可观察序列)。现在你有一系列的序列。当Switch()运算符到达时,它将订阅内部序列,但取消该订阅(以及当更新的内部序列到达时,基本的“任务”)。http://www.introtorx.com/content/v1.0.10621 .0/17_SequencesOfCoincidence.html –

+0

切换器将无法工作,原因有两个:其中,“切换键”是项目,不可以进行处理。其次,他的描述听起来像DoWork充满了副作用,这只有在正确兑现取消标记的情况下才会起作用。由于有一种方法失效,我假设它没有。 – Shlomo

+0

尽管可以在取消标记上注册“无效”。 – Asti