2012-02-07 82 views
9

我正在使用Netty库(版本4来自GitHub)。它在Scala中很好用,但我希望我的库能够使用continuation传递样式来进行异步等待。在netty/NIO监听器中使用scala延续

传统与Netty的,你会做这样的事情(一个例子异步连接操作):

//client is a ClientBootstrap 
val future:ChannelFuture = client.connect(remoteAddr); 
future.addListener(new ChannelFutureListener { 
    def operationComplete (f:ChannelFuture) = { 
     //here goes the code that happens when the connection is made 
    } 
}) 

如果要实现一个库(我是),那么你基本上有三个简单的选项,允许用户该库做的东西连接后:

  1. 刚刚从你的连接方法返回的ChannelFuture,让用户处理它 - 这并不网状提供太多的抽象。
  2. 将ChannelFutureListener作为connect方法的参数,并将其作为侦听器添加到ChannelFuture。
  3. 以一个回调函数对象作为你的连接方法的参数,并调用从您创建的ChannelFutureListener内(这将使用于有点像node.js的回调驱动的风格)

我是什么试图做的是第四种选择;我没有把它列入上面的计数中,因为它并不简单。

我想用Scala的分隔延续,使利用图书馆是有点像堵库,但将非阻塞幕后:

class MyLibraryClient { 
    def connect(remoteAddr:SocketAddress) = { 
     shift { retrn: (Unit => Unit) => { 
       val future:ChannelFuture = client.connect(remoteAddr); 
       future.addListener(new ChannelFutureListener { 
        def operationComplete(f:ChannelFuture) = { 
         retrn(); 
        } 
       }); 
      } 
     } 
    } 
} 

想象中所实现的其他读/写操作同样的时尚。这之中的目标用户的代码可以看起来更象这样:

reset { 
    val conn = new MyLibraryClient(); 
    conn.connect(new InetSocketAddress("127.0.0.1", 1337)); 
    println("This will happen after the connection is finished"); 
} 

换句话说,该程序会看起来像一个简单的阻塞式的计划,但在幕后也不会有任何阻拦或线程。

我遇到的麻烦是我不完全理解分隔连续的输入是如何工作的。当我尝试以上述方式实现它时,编译器会抱怨我的operationComplete实现实际上返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]而不是Unit。我知道scala的CPS中有一个“陷阱”,那就是你必须用@suspendable注释一个shift方法的返回类型,它会被调用堆栈传递到reset,但似乎没有任何方法可以协调即使用没有分隔延续概念的预先存在的Java库。

我觉得实际上一定有办法解决这个问题 - 如果Swarm可以序列化连续性,并将它们在网络上阻塞以便在其他地方计算,那么就可以简单地从一个预先存在的Java类中调用延续。但我无法弄清楚它是如何完成的。为了实现这一点,我需要在Scala中重写整个netty部分吗?

+0

我不知道HOWTO修复斯卡拉的东西,但我建议对你的想法。让我来告诉你为什么。但是让用户“不知道”你的libary的异步特性,你会告诉他这就是它在监听器代码中的“阻塞”调用。事实上,他不知道他甚至把他的代码写在一个听众身上。在监听器中进行阻塞调用可能会导致所有类型的问题。大多数时候你会看到的问题是,它会“减缓”其他io任务,并因此限制吞吐量。 – 2012-02-08 06:27:26

+1

你有一个好点,但我不同意。我认为我的图书馆的用户,如果除了我之外还有其他用户,我们可能不得不理解什么是“重置”开始,从而理解这些调用是非阻塞的。 这实际上只是一种方法:A)更深入地理解分隔延续,B)尝试以更清晰的方式编写基本回调驱动的代码。 – Jeremy 2012-02-08 16:22:42

回答

4

我发现这个解释Scala's continuations当我开始时非常有帮助。特别要注意他解释的部分shift[A, B, C]reset[B, C]。添加虚拟null作为operationComplete的最后声明应该有所帮助。

顺便说一句,如果它可能嵌套在shift之内,则需要在另一个reset内调用retrn()

编辑:这是一个工作示例

import scala.util.continuations._ 
import java.util.concurrent.Executors 

object Test { 

    val execService = Executors.newFixedThreadPool(2) 

    def main(args: Array[String]): Unit = { 
    reset { 
     val conn = new MyLibraryClient(); 
     conn.connect("127.0.0.1"); 
     println("This will happen after the connection is finished"); 
    } 
    println("Outside reset"); 
    } 
} 

class ChannelFuture { 
    def addListener(listener: ChannelFutureListener): Unit = { 
    val future = this 
    Test.execService.submit(new Runnable { 
     def run(): Unit = { 
     listener.operationComplete(future) 
     } 
    }) 
    } 
} 

trait ChannelFutureListener { 
    def operationComplete(f: ChannelFuture): Unit 
} 

class MyLibraryClient { 
    def connect(remoteAddr: String): [email protected][Unit] = { 
    shift { 
     retrn: (Unit => Unit) => { 
     val future: ChannelFuture = new ChannelFuture() 
     future.addListener(new ChannelFutureListener { 
      def operationComplete(f: ChannelFuture): Unit = { 
      println("operationComplete starts") 
      retrn(); 
      null 
      } 
     }); 
     } 
    } 
    } 
} 

有可能输出:

Outside reset 
operationComplete starts 
This will happen after the connection is finished 
+0

这实际上确实使编译器感到高兴,甚至似乎正常工作。 我猜想的关键是你将匿名'ChannelFutureListener'外的'shift'移到了'operationComplete'中,并用闭包调用continuation。我不确定我明白为什么会这样,而另一种方式不行,但我会接受。谢谢! – Jeremy 2012-02-08 16:27:57

+0

这是关于scala延续的一个很好的阅读。他们应该从scala-lang.org页面中删除有关延续的毫无价值的例子,并将它们替换为您链接的文章。 – Jeremy 2012-02-08 16:31:04

+0

@Jeremy是啊,那篇文章是非常好的:) – shams 2012-02-08 17:48:48