2017-01-01 51 views
2

我有一个使用管道的一些Haskell代码:如何使管道与Haskell的管道库并发?

module Main(main) where 
import Pipes 

a :: Producer Int IO() 
a = each [1..10] 

b :: Pipe Int Int IO() 
b = do 
    x <- await 
    yield (x*2) 
    b 

c :: Consumer Int IO() 
c = do 
    x <- await 
    lift $ print x 
    c 

main :: IO() 
main = runEffect $ a >-> b >-> c 

Pipes.Concurrent tutorial演示使用多个工人工作窃取一起。我如何在b内做类似的事情?我想b使用一定数量的工作人员同时执行它的工作。

很显然,并发性在这种情况下并没有用,但它是我能想到的最简单的例子。在我的真实使用案例中,我想使用有限数量的工作人员同时发出一些Web请求。

+1

https://hackage.haskell.org/package/pipes-async-0.1.1/docs/src/Pipes-Async.html#buffer看起来像它可以被调整为有多个给定的管道供给TQueue,所以我们得到'[Pipe ab] - > Pipe ab'。我已发布[功能请求](https://github.com/jwiegley/pipes-async/issues/1)...一小时前>:D – Gurkenglas

回答

2

编辑:我误解了你在问什么; 你可能可以在管道内做到这一点,但我不确定动机是什么。我建议建立可重复使用的管道链,只需使用工人派遣给他们,而不是试图在管道内建立工人。如果您将其构建到管道中,您将失去任何订单保证,即第一个是第一个。

关于Work Stealing的部分是您正在寻找的内容,该代码基本上来自教程的逐字,但让我们分解它的工作原理。下面是我们可以做你想做的一种方式:

module Main(main) where 
import Pipes 
import Pipes.Concurrent 

import Control.Concurrent.Async (async, wait) 
import Control.Concurrent (threadDelay) 
import Control.Monad (forM) 

a :: Producer Int IO() 
a = each [1..10] 

b :: Pipe Int Int IO() 
b = do 
    x <- await 
    yield (x*2) 
    b 

c :: Consumer Int IO() 
c = do 
    x <- await 
    lift $ print x 
    c 

main :: IO() 
main = do 
    (output, input) <- spawn unbounded 
    feeder <- async $ do runEffect $ a >-> toOutput output 
         performGC 

    workers <- forM [1..3] $ \i -> 
    async $ do runEffect $ fromInput input >-> b >-> c 
       performGC 

    mapM_ wait (feeder:workers) 

第一行spawn unbounded是Pipes.Concurrent,它初始化一个“邮箱”具有用于输入和输出的手柄。它首先让我困惑,但在这种情况下,我们将消息发送到输出并将它们从输入中拉出。这类似于golang等语言中的推拉式消息频道。

我们指定Buffer来表示我们可以存储多少条消息,在这种情况下,我们设置无限制的无限制。

好吧,所以邮箱已经初始化了,现在我们可以创建Effect了,它会发送邮件给它。邮箱通道使用STM实现,这就是它可以异步收集邮件的方式。

让我们创建一个提供邮箱的异步作业;

feeder <- async $ do runEffect $ a >-> toOutput output 
        performGC 

a >-> toOutput output只是正常的管道组成,我们需要toOutput输出转换成管道。请注意,performGC调用也是IO的一部分,它允许Pipes.Concurrent知道在作业完成后清理。如果我们愿意的话,我们可以使用forkIO来运行,但在这种情况下,我们使用async,这样我们可以等待结果稍后完成。好吧,所以我们的邮箱应该是异步的收到消息,让我们把它们拉出来做一些工作。

workers <- forM [1..3] $ \i -> 
    async $ do runEffect $ fromInput input >-> b >-> c 
      performGC 

和以前一样的想法,但是这次我们只是产卵了其中的一些。我们使用fromInput从输入中读取输入信息,然后通过链条的其余部分运行,在完成后清理。 input将确保每次只有一个工作人员收到一个值。当所有的工作完成时(它跟踪所有打开的工作),然后它将关闭input管道,工人将完成。

如果您在网络工作者场景中使用此功能,您将有一个主循环继续向toOutput output频道发送请求,然后产生尽可能多的工人,只要您愿意从fromInput input拉入管道。