2012-03-09 110 views
6

我写了一个简单的UDP服务器与Netty,只是打印出日志收到的消息(帧)。为此,我创建了一个简单的帧解码器解码器和一个简单的消息处理程序。我也有一个客户端,可以顺序和/或并行发送多个请求。很多的UDP请求丢失在UDP服务器与Netty

当我配置我的客户端测试仪发送例如几百个请求时,它们之间有一个小的延迟,我用Netty编写的服务器正确接收它们。但是目前我增加了客户端同时请求的数量(例如100),再加上连续请求和少量重复,我的服务器开始丢失许多请求。例如,当我发送50000个请求时,只使用打印出接收到的消息的简单ChannelHandler,我的服务器仅收到大约49000个请求。

当我在该处理程序前添加简单帧解码器(打印出帧并将其复制到另一个缓冲区中)时,服务器仅处理一半的请求!我注意到,无论我指定给创建的NioDatagramChannelFactory的工作器的数量是多少,总是只有一个线程处理请求(我使用推荐的Executors.newCachedThreadPool()作为另一个参数)。

我还创建了另一个基于DatagramSocket与JDK一起的类似的简单UDP服务器,它可以完美处理每个请求(0(零))!当我在客户端发送50000个请求时(例如1000个线程),我在服务器中收到50000个请求。

我在使用Netty配置UDP服务器时做错了什么?或者,也许Netty根本不是为了支持这种负载?为什么只有一个由给定的缓存线程池使用的线程(我注意到只有一个线程,并且通过查看JMX jconsole并通过检查输出日志中的线程名称来使用它们始终是相同的)?我想如果有更多的线程按照预期使用,服务器将能够轻松处理这种负载,因为当不使用Netty时,我可以毫无问题地执行此操作。

见我下面的初始化代码:

... 

lChannelfactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool(), nbrWorkers); 
lBootstrap = new ConnectionlessBootstrap(lChannelfactory); 

lBootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
    @Override 
    public ChannelPipeline getPipeline() 
    { 
     ChannelPipeline lChannelPipeline = Channels.pipeline(); 
     lChannelPipeline.addLast("Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder(null));    
     lChannelPipeline.addLast("Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler(lOuterFrameStatsCollector));    
     return lChannelPipeline; 
    } 
}); 

bindChannel = lBootstrap.bind(socketAddress); 

... 

而decode()方法在我的解码器中的内容:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception 
{ 
    ChannelBuffer lDuplicatedChannelBuffer = null; 
    sLogger.debug("Decode method called."); 

    if (iBuffer.readableBytes() < 8) return null; 
    if (outerFrameStatsCollector != null) outerFrameStatsCollector.incrementNbrRequests(); 

    if (iBuffer.readable()) 
    {   
     sLogger.debug(convertToAsciiHex(iBuffer.array(), iBuffer.readableBytes()));      
     lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer(iBuffer.readableBytes());    
     iBuffer.readBytes(lDuplicatedChannelBuffer); 
    } 

    return lDuplicatedChannelBuffer; 
} 

而的messageReceived的内容()方法在我的处理程序中:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception 
{ 
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage(); 
    if (outerFrameStatsCollector != null) outerFrameStatsCollector.incrementNbrRequests(); 

    if (lMessageBuffer.readable()) 
    {   
     sLogger.debug(convertToAsciiHex(lMessageBuffer.array(), lMessageBuffer.readableBytes()));    
     lMessageBuffer.discardReadBytes(); 
    } 
} 
+0

你都知道,UDP没有交付担保吗? – 2012-03-09 16:54:25

+0

是的,我知道没有这样的交付保证,但我的负载测试是在本地完成的,我不会放弃任何使用DatagramSocket而不是Netty的简单服务器。我目前也正在用WireShark分析这些请求,以验证在一种情况下(没有netty)没有丢失任何东西,并且在使用netty时数据包丢失。 – The4Summers 2012-03-09 17:03:04

+0

@ The4Summers如果你看到Netty丢失了数据包,Netty没有足够快地处理入站数据包,或者你没有处理数据包,所以套接字接收缓冲区正在填充,所以传入的数据包将被丢弃,并且没有其他地方去。加快接收速度或放慢发送速度。 – EJP 2012-03-10 03:51:54

回答

7

您尚未正确配置ConnectionlessBootstrap实例。

  1. 您必须配置以下最佳值。

    SO_SNDBUF大小,大小SO_RCVBUF和ReceiveBufferSizePredictorFactory

    lBootstrap.setOption("sendBufferSize", 1048576); 
    
    lBootstrap.setOption("receiveBufferSize", 1048576); 
    
    lBootstrap.setOption("receiveBufferSizePredictorFactory", 
    new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE)); 
    

    检查DefaultNioDatagramChannelConfig类的更多细节。

  2. 管道正在使用Netty工作线程执行所有操作。如果 工作线程过载,它将延迟执行选择器事件循环 ,并且在读取/写入通道时会出现瓶颈。您必须添加一个执行处理程序,如下面的 管道中所示。它将释放工作线程来完成自己的工作。

    ChannelPipeline lChannelPipeline = Channels.pipeline(); 
    
    lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
        new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); 
    
    //add rest of the handlers here 
    
+0

感谢有关缓冲区大小选项的提示!然而,关于执行处理程序,我在这篇文章后尝试了它,是的,它有点帮助,但是当使用解码器和处理程序时,我仍然丢失了一半的请求。我相信增加缓冲区将会帮助更多。 – The4Summers 2012-03-11 19:22:57

+0

但是,我仍然不明白为什么我们必须在实例化NioDatagramChannelFactory类时指定一个线程池执行程序和一些工作程序,如果它们从不使用的话。 – The4Summers 2012-03-11 19:36:14

+0

我很高兴它为你工作。线程池执行程序用于在NioDatagramChannel管道中创建工作线程。可以将一个工作线程分配给DatagramChannel以进行非阻塞式读/写。如果您的应用程序正在侦听多个端口,则会创建许多工作线程(默认为cpu size * 2)并以循环方式将其分配给DatagramChannel。如果你想有更多的工作线程,你可以在NioDatagramChannelFactory构造函数中指定。 – 2012-03-11 20:45:07