给定一个队列像这样:Scalaz流分块多达N
val queue: Queue[Int] = async.boundedQueue[Int](1000)
欲拉断该队列和它传输到下游水槽,在UP的组块100。
queue.dequeue.chunk(100).to(downstreamConsumer)
作品之类的,但如果我有说101个的消息就不会清空队列。剩下1条消息,除非另有99个消息被推入。我希望尽可能多地从队列中抽取100个消息,这与我的下游过程可以处理的速度一样快。
有一个现有的组合子可用?
你将如何实现elasticChunk? –
我实际上使用方便的q.dequeueBatch方法解决了这个问题。不知道它存在。 –