2012-04-28 55 views
11

我想用“epoll”式的事件管理,以实现高效的单线程套接字通信。“单子友好”基于事件的IO

如果我写了一个非常必要的程序“从零开始”,我会做它基本上是这样的(我只是打出来只是一些伪代码杂交 - 可能不会编译):

import Control.Concurrent 

import Data.ByteString (ByteString) 
import qualified Data.ByteString as ByteString 

import qualified GHC.Event as Event 

import Network 
import Network.Socket 
import Network.Socket.ByteString 

main = withSocketFromSomewhere $ \ socket -> do 
    let fd = fromIntegral . fdSocket $ socket 

    -- Some app logic 
    state <- newMVar "Bla" 

    -- Event manager 
    manager <- Event.new 

    -- Do an initial write 
    initialWrite socket state manager 

    -- Manager does its thing 
    Event.loop manager 

write manager socket bs = 
    -- Should be pretty straight-forward 
    Event.registerFd manager theWrite fd Event.evtWrite 
    where 
    fd = fromIntegral . fdSocket $ socket 
    theWrite key _ = do 
     Event.unregisterFd manager key 
     sendAll socket bs 

read manager socket cont = 
    -- Ditto 
    Event.registerFd manager theRead fd Event.evtRead 
    where 
    fd = fromIntegral . fdSocket $ socket 
    theRead key _ = do 
     Event.unregisterFd manager key 
     bs <- recv socket 4096 
     cont bs 

initialWrite socket state manager = do 
    msg <- readMVar state 
    write manager socket msg 
    read manager socket $ \ bs -> do 
    ByteString.putStrLn bs 
    putMVar state msg 

试想一下,也有一些函数,添加超时事件的经理,和这样的。

然而,这个代码不是特别漂亮,有几个原因:

  1. 我周围的事件管理器进行手动。
  2. 我必须使用MVar我的应用程序逻辑,因为我不能告诉不透明的事件管理器,它应该通过周围的一些国家对我来说,虽然我知道,它仅使用一个线程,因此有可能被用来作为monad变压器堆栈的基础。
  3. 我要创建明确界定为延续读取(我甚至可能为写入做到这一点,我不知道会是什么在这种情况下更聪明)。

现在,这只是尖叫的使用单子变压器等过多的我想能够只是做到这一点:

main = 
    withSocketFromSomewhere $ \ socket -> 
    runEvents . flip runStateT "Bla" $ initialWrite socket 

initialWrite socket = do 
    msg <- lift get 
    write socket msg 
    resp <- read socket 
    liftIO $ ByteString.putStrLn resp 
    lift $ put msg 

此代码应该有相同的行为以上代码;例如通过暂停计算,直到读已经在resp <- read socket线接收,并让我管理在同一个线程/多经理插座。

问题:

  1. 有没有更高层接口给GHC事件API/libevent的/当量,为用户提供更多的权力?考虑到近期GHC发生的同步IO调度更改(我在7.4.1),它甚至值得吗?
  2. 如果我想实现协同并发,例如通过一个函数总是处理来自套接字的读取操作,但是该函数与写入“线程”共享相同的状态monad,会怎么样?这可以用(1)的解决方案来完成吗?

回答

19

我强烈建议您阅读A language-based approach to unifying events and threads。它讲述了如何构建任何你想要的并发系统上选择您的IO子系统的顶部,并在他们的论文,他们实际上实现它的epoll顶部。

不幸的是,论文中的数据类型和代码示例非常糟糕,需要一段时间(至少对我来说)来对其代码进行反向工程,并且他们的论文中甚至存在一些错误。然而,他们的方法实际上是被称为“自由单体”的非常强大和普遍的方法的一个子集。

例如,他们的Trace数据类型只是一个伪装的免费monad。要知道为什么,我们咨询免费单子的哈斯克尔定义:

data Free f r = Pure r | Free (f (Free f r)) 

免费的单子是像“仿函数列表”,其中Pure类似于一个列表的Nil构造和Free类似于一个列表的Cons因为它在“列表”上添加了一个函子。从技术上讲,如果我是迂腐的,没有什么说自由monad必须像上面的列表类数据类型那样实现,但是无论你实现什么,都必须与上述数据类型同构。

关于自由单子的好处是,给定一个函子fFree f自动为一个单子:

instance (Functor f) => Monad (Free f) where 
    return = Pure 
    Pure r >>= f = f r 
    Free x >>= f = Free (fmap (>>= f) x) 

这意味着我们可以分解它们Trace数据类型分为两个部分,基本函子f和然后通过f产生的自由单子:

-- The base functor 
data TraceF x = 
    SYS_NBIO (IO x) 
    | SYS_FORK x x 
    | SYS_YIELD x 
    | SYS_RET 
    | SYS_EPOLL_WAIT FD EPOLL_EVENT x 

-- You can even skip this definition if you use the GHC 
-- "DerivingFunctor" extension 
instance Functor TraceF where 
    fmap f (SYS_NBIO x) = SYS_NBIO (liftM f x) 
    fmap f (SYS_FORK x) = SYS_FORK (f x) (f x) 
    fmap f (SYS_YIELD x) = SYS_YIELD (f x) 
    fmap f SYS_RET = SYS_RET 
    fmap f (SYS_EPOLL_WAIT FD EPOLL_EVENT x) = SYS_EPOLL_WAIT FD EPOLL_EVEN (f x) 

鉴于函子,你得到的Trace单子“免费”:

type Trace a = Free TraceF a 
-- or: type Trace = Free TraceF 

......虽然这并不是为什么它被称为“免费”monad。

那么这是比较容易界定其所有功能:

liftF = Free . fmap Pure 
-- if "Free f" is like a list of "f", then 
-- this is sort of like: "liftF x = [x]" 
-- it's just a convenience function 

-- their definitions are written in continuation-passing style, 
-- presumably for efficiency, but they are equivalent to these 
sys_nbio io = liftF (SYS_NBIO io) 
sys_fork t = SYS_FORK t (return()) -- intentionally didn't use liftF 
sys_yield = liftF (SYS_YIELD()) 
sys_ret = liftF SYS_RET 
sys_epoll_wait fd event = liftF (SYS_EPOLL_WAIT fd event()) 

,那么你可以使用这些命令就像一个单子:

myTrace fd event = do 
    sys_nbio (putStrLn "Hello, world") 
    fork $ do 
     sys_nbio (putStrLn "Hey") 
    sys_expoll_wait fd event 

现在,这里的关键概念。我刚刚编写的那个monad只创建一个数据类型。而已。它根本不解释它。这就像你将如何为表达式编写抽象语法树一样。这完全取决于你如何评估它。在这篇论文中,他们给出了一个表达解释器的具体例子,但编写自己的例子很简单。

重要的概念是,这个解释器可以运行在你想要的任何单子中。所以如果你想通过并发来为某个状态编程,你可以这样做。例如,这里有一个玩具解释它使用StateT IO单子来跟踪多少次的IO动作被称为:

interpret t = case t of 
    SYS_NBIO io -> do 
     modify (+1) 
     t' <- lift io 
     interpret t' 
    ... 

你甚至可以跨线程操作forkIO'd单子!下面是我的一些很老的代码,因为它是书面回来时,我是有经验的要少得多,不知道自由的单子是什么这是越野车和跛,但它的行动证明了这一点:

module Thread (Thread(..), done, lift, branch, fork, run) where 

import Control.Concurrent 
import Control.Concurrent.STM 
import Control.Monad.Cont 
import Data.Sequence 
import qualified Data.Foldable as F 

data Thread f m = 
    Done 
    | Lift (m (Thread f m)) 
    | LiftIO (IO (Thread f m)) 
    | Branch (f (Thread f m)) 
    | Exit 

done = cont $ \c -> Done 
lift' x = cont $ \c -> Lift $ liftM c x 
liftIO' x = cont $ \c -> LiftIO $ liftM c x 
branch x = cont $ \c -> Branch $ fmap c x 
exit = cont $ \c -> Exit 

fork x = join $ branch [return(), x >> done] 

run x = do 
    q <- liftIO $ newTChanIO 
    enqueue q $ runCont x $ \_ -> Done 
    loop q 
    where 
    loop q = do 
     t <- liftIO $ atomically $ readTChan q 
     case t of 
      Exit -> return() 
      Done -> loop q 
      Branch ft -> mapM_ (enqueue q) ft >> loop q 
      Lift mt -> (mt >>= enqueue q) >> loop q 
      LiftIO it -> (liftIO $ forkIO $ it >>= enqueue q) >> loop q 
    enqueue q = liftIO . atomically . writeTChan q 

点背后免费的monad是他们提供monad实例和NOTHING ELSE。换句话说,他们退后一步,给你完全的自由,你想如何解释它们,这就是为什么他们非常有用。

+1

这非常有趣。我想一个相关的已经存在的Haskell概念是[Coroutine monad](http://hackage.haskell.org/package/monad-coroutine)(它构成了一个免费的monad)。协程基本上可以让你将任何仿函数作为一个带有“暂停”的免费monad,它可以让你跳过必须解释与读/写无关的“无趣的东西”。我会尝试这种方法,现在就接受这个解决方案。 – dflemstr 2012-04-28 18:28:50

+0

是的,我实际上实现了这个“免费单体变压器”,Ross计划将其包含在下一个“变形金刚”版本中。你可以看到我的实现[这里](https://github.com/Gabriel439/Haskell-Pipes-Library/blob/master/Control/Monad/Trans/Free.hs)。我将它用于我的'管道'库。 – 2012-04-28 19:36:15

+0

如果一个免费的monad是一个变压器,那么您如何引入诸如'forkIO'等功能?如何在IO monad中“monad stack”被串行化?那部分对我来说毫无意义。 – dflemstr 2012-04-28 20:14:17