2014-12-02 58 views
2

我试图通过示例代码来简化我的问题。我有一个生产者线程不断地抽取数据,我试图批量处理批量间的时间延迟,以便UI有时间呈现它。但结果并非如预期的那样,产品和消费者似乎处于同一线索。Rx在不同线程上产生和消耗

我不希望批处理缓冲区睡在正在生成的线程上。试过SubscribeOn没有什么帮助。我在这里做错了什么,我如何得到这个在生产者和消费者线程上打印不同的线程ID。

static void Main(string[] args) 
{ 
    var stream = new ReplaySubject<int>(); 

    Task.Factory.StartNew(() => 
    { 
     int seed = 1; 
     while (true) 
     { 
      Console.WriteLine("Thread {0} Producing {1}", 
       Thread.CurrentThread.ManagedThreadId, seed); 

      stream.OnNext(seed); 
      seed++; 

      Thread.Sleep(TimeSpan.FromMilliseconds(500)); 
     } 
    }); 

    stream.Buffer(5).Do(x => 
    { 
     Console.WriteLine("Thread {0} sleeping to create time gap between batches", 
      Thread.CurrentThread.ManagedThreadId); 

     Thread.Sleep(TimeSpan.FromSeconds(2)); 
    }) 
    .SubscribeOn(NewThreadScheduler.Default).Subscribe(items => 
    { 
     foreach (var item in items) 
     { 
      Console.WriteLine("Thread {0} Consuming {1}", 
       Thread.CurrentThread.ManagedThreadId, item); 
     } 
    }); 
    Console.Read(); 
} 

回答

2

了解ObserveOnSubscribeOn之间的区别是关键。请参阅 - ObserveOn and SubscribeOn - where the work is being done以深入解释这些。

此外,你绝对不想在你的Rx中使用Thread.Sleep。或任何地方。永远。 Do几乎是邪恶的,但Thead.Sleep几乎总是邪恶的。缓冲区有几个你想要使用的过载 - 它们包括一个基于时间的过载和一个过载,它们接受一个时间限制的计数限制,当达到其中任何一个时,返回一个缓冲区。基于时间的缓冲将在生产者和消费者之间引入必要的并发 - 也就是说,在与生产者分离的线程上将缓冲区递送给它的订阅者。

另请参阅这些问题和答案,这些问题和答案在保持消费者响应方面进行了良好的讨论(在WPF的背景下,但这些问题通常都适用)。

最后一个问题上面具体使用基于时间的缓冲器过载。正如我所说的,在您的调用链中使用BufferObserveOn将允许您在生产者和消费者之间添加并发。您仍然需要注意缓冲区的处理速度仍然很快,以至于您没有在缓冲区用户上建立队列。

如果队列确实堆积了,您需要考虑应用背压,删除更新和/或合并更新的方法。这是一个很大的话题,在这里深入讨论 - 但基本上你可以:

看看正确的缓冲是否有帮助,然后再考虑在源处的限制/合并事件(一个UI只能显示如此多的信息) - 然后考虑更智能的合并,因为这会变得非常复杂。 https://github.com/AdaptiveConsulting/ReactiveTrader是使用一些高级合并技术的项目的一个很好的例子。

+0

我不能删除事件或采取最新值,我需要将所有消息传递到UI,因为它们形成单独的行。加入ObserveOn似乎有伎俩,为什么说睡觉是邪恶的?是否有更好的方法在每批之间创建延迟? – anivas 2014-12-03 06:41:15

+0

@anivas - 您可以通过使用具有'Interval'可见性的'Zip'来引起延迟。 – Enigmativity 2014-12-03 07:29:20

+0

@Enigmativity试图在间隔计时器的方法创建一个内存泄漏。 – anivas 2014-12-03 08:29:55

2

虽然其他答案是正确的,但我想确定您的实际问题,可能是对Rx行为的误解。让制片人睡觉可以阻止对OnNext的后续调用,并且您似乎认为Rx会自动同时调用OnNext,但事实上这并不是因为很好的原因。实际上,Rx有一个需要序列化通知的合同。

有关详细信息,请参阅Rx Design Guidelines中的第4.2,6.7节。

最终,它看起来好像您试图从Rxx实施BufferIntrospective运算符。此运算符允许您传入类似于ObserveOn的并发性调度程序,以在生产者和使用者之间创建并发边界。 BufferIntrospective是一种动态背压策略,根据观察者变化的潜伏期推出异质大小的批次。当观察者正在处理当前批次时,操作员会缓存所有传入的并发通知。为了做到这一点,运营商利用了这样一个事实,即OnNext是一个阻塞呼叫(根据§4.2合同),因此,应尽可能在接近查询边缘的地方应用此运营商,一般在您致电Subscribe之前。正如詹姆斯所描述的那样,你可以把它称为“智能缓冲”策略本身,或者将其视为实施这种策略的基准;例如,我还定义了一个SampleIntrospective运算符,用于删除每批中的最后一个通知。

+1

有用的评论,这些设计指导方针是Rx dev恕我直言需要阅读。 +1 – 2014-12-03 15:41:36