2017-07-02 86 views
2

的问题,我试图解决Rx.Net - 获取股票价格的变化,并进行处理

  1. 获取股票蜱
  2. 始终考虑最新股价
  3. 每个X秒取快照的蜱并发送处理

所以我有一个Observable股票蜱的来源。它只发送我感兴趣的股票的蜱。我需要做的是收到这个股票价格,并在每个x秒(例如,让我们说每3秒)发送一个快照处理价格。如果在3秒钟内收到同一只股票的2个蜱虫,我只需要最新的蜱虫。这个处理计算量很大,所以如果可能的话,我希望避免两次处理相同的股票价格。

举一个例子。

假设在序列的开始,我收到2个刻度 - >MSFT:1 $,GOOG:2 $

在接下来的3秒内我什么也没收到,所以MSFT & GOOG应该发送处理滴答。

现在下一秒我收到新的节拍 - >MSFT:1 $,GOOG:3 $,INTL:3 $

再次让我们接下来的3秒没有一样是在范围内承担

这里。 ,因为MSFT价格没有变化(它仍然是1美元),只有GOOG & INTL应该发送处理。

而这一直重复一整天。

现在,我认为Rx帮助解决这种问题,容易&优雅的方式。但我有一个问题,有适当的查询。 这是我到目前为止,会尽量解释它做什么,什么是它的问题

var finalQuery = 
       from priceUpdate in **Observable<StockTick>** 
       group priceUpdate by priceUpdate.Stock into grouped 
       from combined in Observable.Interval(TimeSpan.FromSeconds(3)) 
         .CombineLatest(grouped, (t, pei) => new { PEI = pei, Interval = t }) 
       group combined by new { combined.Interval } into combined 
       select new 
       { 
        Interval = combined.Key.Interval, 
        PEI = combined.Select(c => new StockTick(c.PEI.Stock, c.PEI.Price)) 
       }; 

      finalQuery 
       .SelectMany(combined => combined.PEI) 
       .Distinct(pu => new { pu.Stock, pu.Price }) 
       .Subscribe(priceUpdate => 
       { 
        Process(priceUpdate); 
       }); 

public class StockTick 
{ 
    public StockTick(string stock, decimal price) 
    {  
     Stock = stock; 
     Price = price; 
    } 
    public string Stock {get;set;} 
    public decimal Price {get;set;} 
} 

因此,这得到了股票价格,团体它通过股票,再结合最新的这组

ed序列与Observable.Interval。通过这种方式,我试图确保只处理股票的最新股票价格,并且每3秒启动一次。

然后它再次按照时间间隔对它进行分组,结果我有通过的每3秒间隔的一组序列。

作为最后一步,我使用SelectMany将此序列扁平化为股票价格更新的顺序,并且我正在应用Distinct以确保同一个股票的相同价格不会被处理两次。

这个查询有2个问题我不喜欢。首先是我不喜欢双人组合 - 有什么办法可以避免吗?其次 - 用这种方法,我必须逐一处理价格,我真正想要的是快照 - 这是在3秒钟内,无论我有什么我会扣上来发送处理,但无法弄清楚如何到扣起来

我很乐意提供其他方式解决此问题的建议,但我宁愿留在Rx内,除非真的有更好的方法。

+0

在你的例子中,你不会发送微软,因为它没有改变,但Google和你也没有发送。你能澄清一下吗? – Enigmativity

+0

@Enigmativity道歉混淆,意味着写**谷歌:3 $ ** ...将更新现在的帖子 – Michael

回答

3

有两件事情:

  1. 你要采取Sample运营商的优势:
  2. 你可能想DistinctUntilChanged而不是Distinct。如果您使用Distinct,那么如果MSFT从$ 1变为$ 2,然后又变回$ 1,您将不会在第三个记号上得到一个事件。

我想,你的解决方案将是这个样子:

IObservable<StockTick> source; 
source 
    .GroupBy(st => st.Stock) 
    .Select(stockObservable => stockObservable 
     .Sample(TimeSpan.FromSeconds(3)) 
     .DistinctUntilChanged(st => st.Price) 
    ) 
    .Merge() 
    .Subscribe(st => Process(st)); 

编辑Distinct性能问题):

每个Distinct运营商必须在其内保持,充分截然不同的历史。如果你有一个高价股,比如AMZN,到目前为止,它的价格从958美元到974美元不等,那么你可能会得到很多数据。这是〜1600个可能的数据点,必须坐在内存中,直到您退订Distinct。它最终还会降低性能,因为每个AMZN勾号必须在经历之前与1600个现有数据点进行比较。如果这是一个长期运行的过程(跨越多个交易日),那么您将得到更多的数据点。

鉴于N股,你有N Distinct运营商需要相应地操作。乘以N股的行为,你有一个不断增加的问题。

+0

感谢您的答案,让我试试这个并回馈给您。在一个相对说明中,我确实希望'不同',正如我所提到的那样,我处理价格,所以同一价格不应该在收到价格时处理两次。 – Michael

+0

您可能想要小心那里的内存影响。 – Shlomo

+0

内存影响?你能否详细说明一下? – Michael