2014-11-03 67 views
0

新的Rx;试图弄清楚。消费流,转型,然后交给其他消费者(没有州)的方法

我正在通过事件从加速度计获取数据,将数据调整为我自己的格式,然后通过反应式扩展向其他消费者提供数据流。

我的方法部分是在调用一个重要的构造函数。有人可以看看这个,并帮助我理解我应该做什么不同吗?

希望,这段代码就足够了:

public class accel_raw_producer : IObservable<AccelerometerFrame_raw> 
{ 
    private static Spatial spatial = null; 
    private IObservable<EventPattern<SpatialDataEventArgs>> spatialEvents; 

    public accel_raw_producer() 
    { 
     spatial = new Spatial(); 
     spatial.close(); 
     spatialEvents = System.Reactive.Linq.Observable.FromEventPattern 
     <SpatialDataEventHandler, SpatialDataEventArgs>(
     handler => handler.Invoke, 
     h => spatial.SpatialData += h, 
     h => spatial.SpatialData -= h); 
     subscription = spatialEvents.Subscribe(); 
     spatial.open(-1); 
    } 


    public IDisposable Subscribe(IObserver<AccelerometerFrame_raw> observer) 
    { 
     return 
     (from evt in spatialEvents 
     let e = evt.EventArgs 
     select new AccelerometerFrame_raw 
     (
      e.spatialData[0].Acceleration[0], 
      e.spatialData[0].Acceleration[1], 
      e.spatialData[0].Acceleration[2], 
      e.spatialData[0].AngularRate[0], 
      e.spatialData[0].AngularRate[1], 
      e.spatialData[0].AngularRate[2] 
     )).Subscribe(); 
    } 
} 

public consumerClass :IObserver<AccelerometerFrame_raw> 
{ 
    accel_raw_producer accelStream; 
    IDisposable Unsubscriber; 

    public consumerClass() 
    { 
     accelStream = new accel_raw_producer(); 
     Unsubscriber = accelStream.Subscribe(this); 
    } 

    public void OnCompleted() 
    { 
     throw new NotImplementedException(); 
    } 

    public void OnError(Exception error) 
    { 
     throw new NotImplementedException(); 
    } 

    public void OnNext(AccelerometerFrame_raw accelFrame) 
    { 
     if (null == accelFrame) return; 
     AccelX = accelFrame.Acceleration.X; 
     AccelY = accelFrame.Acceleration.Y; 
     AccelZ = accelFrame.Acceleration.Z; 
     GyroX = accelFrame.Rotation.X; 
     GyroY = accelFrame.Rotation.Y; 
     GyroZ = accelFrame.Rotation.Z; 
    }  
} 

当我设置在AccelerometerFrame_raw断点,它就会被触发,但值不会传播到消费者。所以这种方法中的某些东西需要有所不同。

回答

3

我不确定这里是否遗漏了一些细节,但是您似乎没有使用观察者输入参数,而订阅也没有对onNext值做任何事情。就个人而言,我会考虑这个摆动周围,只是传递出的IObservable,让来电者订阅它自己:

public IObservable<AccelerometerFrame_raw> AccelerometerFrames() 
{ 
    return 
     from evt in spatialEvents 
     let e = evt.EventArgs 
     select new AccelerometerFrame_raw 
     (
      e.spatialData[0].Acceleration[0], 
      e.spatialData[0].Acceleration[1], 
      e.spatialData[0].Acceleration[2], 
      e.spatialData[0].AngularRate[0], 
      e.spatialData[0].AngularRate[1], 
      e.spatialData[0].AngularRate[2] 
     ); 
} 
+0

当我尝试,我得到一个编译错误:无法隐式转换类型“System.IObservable '到'System.IDisposable'。存在明确的转换(你是否缺少一个转换吗?)这就是让我尝试了几个例如.AsObservable()和.Subscribe()以查看它们是否工作的原因。另请注意,我在原始帖子中添加了更多代码,以便您可以看到生产者类和消费者类之间的关系。 OnNext在消费者阶层中。 – philologon 2014-11-04 02:41:00

+0

好的,我最近的评论有些问题。我没有意识到你已经改变了方法的返回类型。现在我明白了,这让我怀疑,如果我这样做了,那么我在Subscribe方法中放什么? – philologon 2014-11-04 02:50:33

+1

什么都没有 - 你不要自己动手。您返回observable,以便调用者可以自己调用'Subscribe(onNext)'。 – 2014-11-04 05:06:34