编辑:我误解了你在问什么; 你可能可以在管道内做到这一点,但我不确定动机是什么。我建议建立可重复使用的管道链,只需使用工人派遣给他们,而不是试图在管道内建立工人。如果您将其构建到管道中,您将失去任何订单保证,即第一个是第一个。
关于Work Stealing的部分是您正在寻找的内容,该代码基本上来自教程的逐字,但让我们分解它的工作原理。下面是我们可以做你想做的一种方式:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO()
a = each [1..10]
b :: Pipe Int Int IO()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO()
c = do
x <- await
lift $ print x
c
main :: IO()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
第一行spawn unbounded
是Pipes.Concurrent,它初始化一个“邮箱”具有用于输入和输出的手柄。它首先让我困惑,但在这种情况下,我们将消息发送到输出并将它们从输入中拉出。这类似于golang等语言中的推拉式消息频道。
我们指定Buffer来表示我们可以存储多少条消息,在这种情况下,我们设置无限制的无限制。
好吧,所以邮箱已经初始化了,现在我们可以创建Effect
了,它会发送邮件给它。邮箱通道使用STM实现,这就是它可以异步收集邮件的方式。
让我们创建一个提供邮箱的异步作业;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
的a >-> toOutput output
只是正常的管道组成,我们需要toOutput
输出转换成管道。请注意,performGC
调用也是IO的一部分,它允许Pipes.Concurrent知道在作业完成后清理。如果我们愿意的话,我们可以使用forkIO
来运行,但在这种情况下,我们使用async
,这样我们可以等待结果稍后完成。好吧,所以我们的邮箱应该是异步的收到消息,让我们把它们拉出来做一些工作。
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
和以前一样的想法,但是这次我们只是产卵了其中的一些。我们使用fromInput
从输入中读取输入信息,然后通过链条的其余部分运行,在完成后清理。 input
将确保每次只有一个工作人员收到一个值。当所有的工作完成时(它跟踪所有打开的工作),然后它将关闭input
管道,工人将完成。
如果您在网络工作者场景中使用此功能,您将有一个主循环继续向toOutput output
频道发送请求,然后产生尽可能多的工人,只要您愿意从fromInput input
拉入管道。
https://hackage.haskell.org/package/pipes-async-0.1.1/docs/src/Pipes-Async.html#buffer看起来像它可以被调整为有多个给定的管道供给TQueue,所以我们得到'[Pipe ab] - > Pipe ab'。我已发布[功能请求](https://github.com/jwiegley/pipes-async/issues/1)...一小时前>:D – Gurkenglas