2016-04-29 88 views
3

我有简单的vert.x应用:Vert.x多线程网络插座

public class Main { 
public static void main(String[] args) { 
    Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40).setInternalBlockingPoolSize(40)); 
    Router router = Router.router(vertx); 
    long main_pid = Thread.currentThread().getId(); 
    Handler<ServerWebSocket> wsHandler = serverWebSocket -> { 
     if(!serverWebSocket.path().equalsIgnoreCase("/ws")){ 
      serverWebSocket.reject(); 
     } else { 
      long socket_pid = Thread.currentThread().getId(); 
      serverWebSocket.handler(buffer -> { 
       String str = buffer.getString(0, buffer.length()); 
       long handler_pid = Thread.currentThread().getId(); 
       log.info("Got ws msg: " + str); 
       String res = String.format("(req:%s)main:%d sock:%d handlr:%d", str, main_pid, socket_pid, handler_pid); 
       try { 
        Thread.sleep(500); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       serverWebSocket.writeFinalTextFrame(res); 
      }); 
     } 
    }; 
    vertx 
     .createHttpServer() 
     .websocketHandler(wsHandler) 
     .listen(8080); 
} 
} 

当我连接多个客户端此服务器我看到它在一个线程。但我想要并行处理每个客户端连接。我应该如何改变这个代码来做到这一点?

+1

我会将它包装在一个'Verticle'中,并多次使用'DeploymentOptions.setInstances()'开始'Verticle'。我可以提供一个例子,如果这是你想要的。 – alexvetter

+0

谢谢,这就是我已经做了))只是犹豫。 –

+1

请考虑Will的答案。你可能应该看看[事件驱动的并发性](http://berb.github.io/diploma-thesis/community/055_events.html)和异步I/O。 – alexvetter

回答

3

此:

新VertxOptions()setWorkerPoolSize(40).setInternalBlockingPoolSize(40)

看起来像你想创建自己的HTTP连接池,这很可能不是。你真正想要什么。

的想法Vert.x和其他无阻塞event-loop基础框架,是我们不尝试1 thread -> 1 connection亲和力,相反,当一个请求,目前正在由事件循环线程服务正在等待IO - EG响应从一个数据库 - 事件循环线程被释放到服务另一个连接。这然后允许单个事件循环线程以类似并发的方式服务于多个连接。

如果您想充分利用您的机器上的所有内核,并且只打算运行一个verticle,则在部署Verticle时将实例数设置为核心数。

IE

Vertx.vertx().deployVerticle("MyVerticle", new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors())); 
1

Vert.x是一个reactive框架,这意味着它使用一个单独的线程模型来处理所有应用程序负载。这个模型是known比螺纹模型更好地缩放。

要知道的关键是,你放在一个处理程序必须永远不会阻止所有代码(如Thread.sleep),因为它会阻止主线程。如果你已经阻止代码(例如说一个JDBC调用),你应该换你的拦截代码在executingBlocking处理程序,如:

serverWebSocket.handler(buffer -> { 
    String str = buffer.getString(0, buffer.length()); 
    long handler_pid = Thread.currentThread().getId(); 
    log.info("Got ws msg: " + str); 
    String res = String.format("(req:%s)main:%d sock:%d handlr:%d", str, main_pid, socket_pid, handler_pid); 
    vertx.executeBlocking(future -> { 
    try { 
     Thread.sleep(500); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    serverWebSocket.writeFinalTextFrame(res); 
    future.complete(); 
    }); 
}); 

现在所有的拦截代码将在线程的线程池运行您可以配置为已在其他回复中显示。

如果您希望避免编写所有这些执行阻塞处理程序,并且您知道需要执行多个阻塞调用,那么您应该考虑使用worker verticle,因为这些将在事件总线级别进行扩展。

多线程的最后一点是,如果你使用多线程,你的服务器将不会像单线程那样高效,例如它将无法处理1000万个websocket,因为1000万线程事件是一个现代机器(我们在2016年)将使您的操作系统调度程序瘫痪。