我试图通过示例代码来简化我的问题。我有一个生产者线程不断地抽取数据,我试图批量处理批量间的时间延迟,以便UI有时间呈现它。但结果并非如预期的那样,产品和消费者似乎处于同一线索。Rx在不同线程上产生和消耗
我不希望批处理缓冲区睡在正在生成的线程上。试过SubscribeOn
没有什么帮助。我在这里做错了什么,我如何得到这个在生产者和消费者线程上打印不同的线程ID。
static void Main(string[] args)
{
var stream = new ReplaySubject<int>();
Task.Factory.StartNew(() =>
{
int seed = 1;
while (true)
{
Console.WriteLine("Thread {0} Producing {1}",
Thread.CurrentThread.ManagedThreadId, seed);
stream.OnNext(seed);
seed++;
Thread.Sleep(TimeSpan.FromMilliseconds(500));
}
});
stream.Buffer(5).Do(x =>
{
Console.WriteLine("Thread {0} sleeping to create time gap between batches",
Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(2));
})
.SubscribeOn(NewThreadScheduler.Default).Subscribe(items =>
{
foreach (var item in items)
{
Console.WriteLine("Thread {0} Consuming {1}",
Thread.CurrentThread.ManagedThreadId, item);
}
});
Console.Read();
}
我不能删除事件或采取最新值,我需要将所有消息传递到UI,因为它们形成单独的行。加入ObserveOn似乎有伎俩,为什么说睡觉是邪恶的?是否有更好的方法在每批之间创建延迟? – anivas 2014-12-03 06:41:15
@anivas - 您可以通过使用具有'Interval'可见性的'Zip'来引起延迟。 – Enigmativity 2014-12-03 07:29:20
@Enigmativity试图在间隔计时器的方法创建一个内存泄漏。 – anivas 2014-12-03 08:29:55