我有一个股票报价序列进来,我想在最后一小时采取所有的数据,并对其做一些处理。我试图用反应式扩展2.0实现这一点。我在另一篇文章上阅读使用Interval,但我认为这已被弃用。无功扩展滑动时间窗口
回答
这会扩展方法解决问题了吗?
public static IObservable<T[]> RollingBuffer<T>(
this IObservable<T> @this,
TimeSpan buffering)
{
return Observable.Create<T[]>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Timestamp().Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
}, ex => o.OnError(ex),() => o.OnCompleted());
});
}
很可能Buffer
是你在找什么:
var hourlyBatch = ticks.Buffer(TimeSpan.FromHours(1));
我认为这会产生1小时数据的非重叠窗口,而我基本上每次有新值进入时都需要一个滑动窗口。 – 2012-07-25 08:04:00
啊......这就是ticks.Window(TimeSpan.FromHouse(1))。 – 2012-07-25 15:10:35
这将给1小时窗口,但不会重叠窗口。您需要提供窗口打开和关闭观察值,或跳过计数/次。 – 2015-07-07 06:24:34
您正在寻找窗口操作员! 这是一个漫长的文章中,我与巧合的序列工作(重叠序列之窗) http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html
所以写了,如果你想建立你可以使用这种代码的滚动平均值
var scheduler = new TestScheduler();
var notifications = new Recorded<Notification<double>>[30];
for (int i = 0; i < notifications.Length; i++)
{
notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i));
}
//Push values into an observable sequence 0.1 seconds apart with values from 0 to 30
var source = scheduler.CreateHotObservable(notifications);
source.GroupJoin(
source, //Take values from myself
_=>Observable.Return(0, scheduler), //Just the first value
_=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour
(lhs, rhs)=>rhs.Sum()) //Aggregation you want to do.
.Subscribe(i=>Console.WriteLine (i));
scheduler.Start();
而且我们可以看到它在输入值时输出滚动条。
0,1,3,6,10,15,21,28 ...
我无法获得此代码的工作。 '''rhs.Sum()'''生成'''IObservable
道歉,我纠正了答案。当使用'TestScheduler'确保准确时,Observable.Return(0)需要传入的调度器,即Observable.Return(0,scheduler)。也许更有意义的值是Observable.Return(Unit.Default,scheduler)'或'Observable.Empty
或者假设数据已经Timestamp
版,只需使用Scan
:
public static IObservable<IReadOnlyList<Timestamped<T>>> SlidingWindow<T>(this IObservable<Timestamped<T>> self, TimeSpan length)
{
return self.Scan(new LinkedList<Timestamped<T>>(),
(ll, newSample) =>
{
ll.AddLast(newSample);
var oldest = newSample.Timestamp - length;
while (ll.Count > 0 && list.First.Value.Timestamp < oldest)
list.RemoveFirst();
return list;
}).Select(l => l.ToList().AsReadOnly());
}
- 1. 如何在滑动窗口中获取当前窗口时间?
- 2. WPF扩展窗口
- 3. 无功扩展计时器
- 4. 创建os浮动窗口Firefox扩展
- 5. SQL滑动窗口聚集(不使用窗口功能)
- 6. WPF滑动窗口
- 7. 滑动窗口集
- 8. opencv滑动窗口
- 9. 滑动窗口seq
- 10. CustomControl滑动窗口
- 11. 地址窗口扩展
- 12. WPF - 向左扩展窗口
- 13. 如何端口Chrome扩展功能的Firefox扩展功能
- 14. 如何扩展jQuery滑块的功能
- 15. 用于记录分析的滑动时间窗口
- 16. data.frame中列的基于时间的平均(滑动窗口)
- 17. 你可以级联滑动时间窗口火花流
- 18. 滑动处理时间窗口计算不一致的结果
- 19. 使用滑动窗口FFT
- 20. R data.table滑动窗口
- 21. ExtJs窗口滑动问题
- 22. 跳过滑动窗口
- 23. 无功扩展(Rx)+ MVVM =?
- 24. 无功扩展与FileSystemWatcher
- 25. 单声道无功扩展?
- 26. Drools:滑动窗口:基于时间:在给定时间后缩回
- 27. 差异扩展Composite和扩展指定的窗口小部件
- 28. 窗口上的Posix和intl扩展
- 29. Java FX - 不可扩展窗口
- 30. 扩展Skype的业务电话窗口
你想有价值的最后一个小时,每次有新价值时,或者你只是想每小时价值一小时的股票价格? – Enigmativity 2012-07-20 01:32:24
我希望在每次有新值出现时每次值的最后一小时。我已经看过Buffer,但我认为它不是正确的。 – 2012-07-25 08:02:21