我和我的同事有争议。我们正在编写处理大量数据的.NET应用程序。它接收数据元素,根据某些标准将它们的子集分组成块,并处理这些块。我应该在我的接口公开的IObservable <T>?
比方说,我们有Foo
抵达的一些源类型的数据项(从网络,例如)一个接一个。我们希望收集亚Foo
类型的相关对象的,从Bar
类型的每个这样的子集和处理的对象构建Bar
类型的对象。
我们中提出了以下设计。它的主题是直接从我们组件的接口暴露IObservable<T>
对象。
// ********* Interfaces **********
interface IFooSource
{
// this is the event-stream of objects of type Foo
IObservable<Foo> FooArrivals { get; }
}
interface IBarSource
{
// this is the event-stream of objects of type Bar
IObservable<Bar> BarArrivals { get; }
}
/********* Implementations *********
class FooSource : IFooSource
{
// Here we put logic that receives Foo objects from the network and publishes them to the FooArrivals event stream.
}
class FooSubsetsToBarConverter : IBarSource
{
IFooSource fooSource;
IObservable<Bar> BarArrivals
{
get
{
// Do some fancy Rx operators on fooSource.FooArrivals, like Buffer, Window, Join and others and return IObservable<Bar>
}
}
}
// this class will subscribe to the bar source and do processing
class BarsProcessor
{
BarsProcessor(IBarSource barSource);
void Subscribe();
}
// ******************* Main ************************
class Program
{
public static void Main(string[] args)
{
var fooSource = FooSourceFactory.Create();
var barsProcessor = BarsProcessorFactory.Create(fooSource) // this will create FooSubsetToBarConverter and BarsProcessor
barsProcessor.Subscribe();
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
其他建议,它的主题是用我们自己的发布/订阅接口和仅在需要时使用的Rx的实现内的另一个设计。
//********** interfaces *********
interface IPublisher<T>
{
void Subscribe(ISubscriber<T> subscriber);
}
interface ISubscriber<T>
{
Action<T> Callback { get; }
}
//********** implementations *********
class FooSource : IPublisher<Foo>
{
public void Subscribe(ISubscriber<Foo> subscriber) { /* ... */ }
// here we put logic that receives Foo objects from some source (the network?) publishes them to the registered subscribers
}
class FooSubsetsToBarConverter : ISubscriber<Foo>, IPublisher<Bar>
{
void Callback(Foo foo)
{
// here we put logic that aggregates Foo objects and publishes Bars when we have received a subset of Foos that match our criteria
// maybe we use Rx here internally.
}
public void Subscribe(ISubscriber<Bar> subscriber) { /* ... */ }
}
class BarsProcessor : ISubscriber<Bar>
{
void Callback(Bar bar)
{
// here we put code that processes Bar objects
}
}
//********** program *********
class Program
{
public static void Main(string[] args)
{
var fooSource = fooSourceFactory.Create();
var barsProcessor = barsProcessorFactory.Create(fooSource) // this will create BarsProcessor and perform all the necessary subscriptions
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
你认为哪一个更好?揭露IObservable<T>
,使我们的组件创建新的事件流正在从RX运营商,或定义我们自己的发布/订阅接口,如果需要,内部使用的Rx?
这里有一些事情要考虑有关设计:
在第一次设计了接口的消费者有Rx的整个电力在他/她的指尖,可以执行任何的Rx运营商。我们中的一个人声称这是一个优势,另一个人声称这是一个缺点。
第二个设计允许我们使用任何发布/订阅架构引擎盖下。第一个设计将我们与Rx联系在一起。
如果我们希望使用Rx的功率,它需要在第二个设计中更多的工作,因为我们需要翻译的自定义发布/订阅实施的Rx和背部。它需要为每个希望进行事件处理的类编写胶水代码。
我喜欢你开始这个问题的方式。 “我和我的同事有争议。” +1。 – 2012-07-09 11:40:23
为什么不公开所有IObservable东西作为*扩展方法*来处理所有“胶水代码”。保持所有IObs与您的对象模型分离,同时提供选项。 '公共IObservable AsObservable(这个IPublisher 发布者)'或类似的东西 –
Will
2012-07-09 12:24:04
你做得很好,以平衡的方式提出问题。 – 2012-07-09 12:46:42