2010-07-09 45 views
3

所以我只是玩弄RX并学习它。我开始玩事件,并想知道如何订阅事件,并以异步方式批量处理结果。请允许我用代码解释:反应框架(RX)和异步处理事件

是引发事件的简单类:

public class EventRaisingClass 
{ 
    public event EventHandler<SomeEventArgs> EventOccured; 

    //some other code that raises event... 
} 

public class SomeEventArgs : EventArgs 
{ 
    public SomeEventArgs(int data) 
    { 
     this.SomeArg = data; 
    } 

    public int SomeArg { get; private set; } 
} 

然后我公司主营:

public static void Main(string[] args) 
{ 
    var eventRaiser = new EventRaisingClass(); 
    IObservable<IEvent<SomeEventArgs>> observable = 
     Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e); 

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100); 

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async? 
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously... 

} 

正如你可以在我的意见看,当你只需要调用订阅像,所有订阅代码都会同步发生。有没有一种方法可以使用RX让订阅在不同线程上被调用,只要有一批新的事件可以工作?

回答

2

我相信你正在寻找SubscribeOnObserveOn,通过IScheduler。在System.Concurrency下有几个内置的调度程序;其中一些使用任何线程当前,其他人使用特定的线程。

This video有关于调度程序概念的更多信息。

Rx团队最近还发布了一个hands-on labs文档,该文档是目前最接近教程的文档。

2
bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(... 

SubscribeOn是指定在其所谓的“订阅副作用”正在发生的时间表。例如,每当有人订阅时,您的observable可以打开一个文件。

ObserveOn是指定每次有新值时对观察者的调用发生的时间表。实际上,它比SubscribeOn更经常使用。