2017-02-16 72 views
0

我正在开发一个解决方案,例如为用户发生高速率更改。 我需要的是记录每个更改,延迟通知,然后返回不同的更改,例如已更改用户的用户ID。 我想出了下面的代码使用RX:可观察缓冲区

private Subject<int> userEventSubject= new Subject<int>(); 
userEventSubject 
    .Buffer(EVENT_BUFFER_DELAY) 
    .Where(buffer => buffer.Count > 0)     
    .Subscribe(OnEventBufferProcess); 

这似乎正常工作和我得到

userEventSubject.OnNext(userId); 

我的问题中添加的所有值是:我能明显的变化例如,当具有多个具有相同用户标识值的OnNext时,我不希望生成的缓冲区包含重复项。当然,我可以在订阅处理程序中使用不同的值,但是我想知道这是否可以在rx级别上完成?我尝试了独特的方法,但仍然可以获得所有的值。

由于我只想跟踪在延迟期间所做的更改,返回订阅处理程序并重新开始,是否需要清除userEventSubject?

回答

3

如果您使用Observable.Distinct方法,您将检查不同的IList<int> s,这确实不是您想要的行为。

看来你需要将独特的元素应用到缓冲区内的元素。

可以使用Observable.Select方法将此每一个缓冲区:

private Subject<int> userEventSubject= new Subject<int>(); 
userEventSubject 
    .Buffer(EVENT_BUFFER_DELAY) 
    .Where(buffer => buffer.Count > 0) 
    .Select(buffer => buffer.Distinct()) // distinct elements in each buffer 
    .Subscribe(OnEventBufferProcess); 

要知道,当你使用Distinct()方法,你需要警惕你是如何检查的平等,如果你是参考工作类型 - 你可能需要实现某种相等比较器。在这种情况下,这不是问题,因为您正在使用int s。

+0

是的,我明白,我需要有一个平等的比较器参考类型。但是有一个问题,OnEventBufferProcess是异步的吗?正如我理解的正确,而OnEventBufferProcess仍然工作任何调用OnNext将被阻止? – NullReference

+1

@NullReference是的,关于阻止你是正确的。有关于通过异步方法订阅的一些问题,解决方案取决于您要查找的确切行为。看到这里例如:http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async – TheInnerLight

+0

什么是需要做的处理单独的线程/线程所以OnNext永远不会被阻止。据我所知,ObserveOn将允许我指定调度器应在哪些回调函数上运行。 因此,ObserveOn(ThreadPoolScheduler.Instance)将使回调在一个ThreadPool线程上运行,并且只要ThreadPool具有可用线程就不会阻塞? 从测试我可以看出,它似乎按预期工作:) – NullReference

0

你可以使用DistinctUntilChanged扩展方法:

private Subject<int> userEventSubject= new Subject<int>(); 
userEventSubject 
    .DistinctUntilChanged() 
    .Buffer(EVENT_BUFFER_DELAY)  
    .Where(buffer => buffer.Count > 0)     
    .Subscribe(OnEventBufferProcess);