2014-10-28 260 views
2

我使用EmbeddedChannel来测试我的handlerscodecs在下面的格式处理消息:的Netty ByteToMessageCodec <ByteBuf>解码消息两次(部分)

+------------+------------------+----------------+  
| Header | Payload Length | Payload  | 
| 16 bytes |  2 bytes  | "Some data" |  
+------------+------------------+----------------+ 

首先,我想达到的目标:

  1. 通过创建一个对象来存储标题详细信息并将解码后的标题对象添加到ChannelHandlerContextAttributeMap中以供将来使用;
  2. 等待/检索整个有效载荷数据;
  3. 将Header对象和整个有效载荷作为ByteBuf用于路由消息的最终处理程序。

我使用以下处理:

  1. ByteToMessageCodec<ByteBuf>提取头信息,并将其添加到属性列表。
  2. LengthFieldBasedFrameDecoder读取有效载荷长度并等待/检索整个帧。
  3. SimpleChannelInboundHandler这将使用从属性列表检索到的头对象相应地路由有效载荷。

当一个消息被传递给ByteToMessageCodecdecode方法,所述报头处理和正确地提取。然后,我继续将头对象添加到AttributeMap并添加ByteBuf(其有一个可读字节= 2字节(有效负载长度指示符)+有效负载长度)。

假设有效载荷长度是1020字节。该消息最初由codec收到,将有readableBytes = 16 bytes + 2 bytes + 1020 bytes。通过decode方法读取头部,然后将余下的可用字节(1022)添加到List<Object> out

如果我的理解是正确的,该字节的其余部分将现在被传递到下一个处理是LengthFieldBasedFrameDecoder将读取的长度指标和有效载荷(1020个字节)传递给SimpleChannelHanlder,但我一定是搞错了。

decode方法被称为再次,与List<Object> out相同的1022字节。

在解码方法那里的JavaDoc如下:

Decode the from one ByteBuf to an other. This method will be called till either the input ByteBuf 
has nothing to read when return from this method or till nothing was read from the input ByteBuf. 

这是否意味着decode将被调用,直到readableBytes == 0

将其余消息传递给LengthFieldBasedFrameDecoder的最有效方法是什么?

我承担LengthFieldBasedFrameDecoder需要ByteBuf作为输入,所以这是否意味着我需要设置readerIndex = 0和ByteBuf的副本添加到List<Object> out

任何帮助/建议/批评将不胜感激,我想以最干净的方式做到这一点。

这里是我的decode方法:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
    byte [] headerBytes = new byte[HEADER_LENGTH]; 
    in.readBytes(headerBytes, 0, HEADER_LENGTH); 

    Header header = new Header(headerBytes); 
    System.out.println("Decoded Header: \n" + header); 

    //Set the header attribute so it can be used by routing handlers 
    ctx.attr(ChannelAttributes.HEADER).getAndSet(header); 
    //pass to next handler 
    out.add(in); 
} 

注:我在读Netty in Action MEAP v8

回答

2

这是否意味着解码将被调用,直到readableBytes == 0?

基本上,是的。一个ByteToMessageDecoder的简化图看起来像这样

while (in.isReadable()) { 
    int outputSizeBefore = out.size(); 
    int readableBytesBefore = in.readableBytes(); 

    callYourDecodeImpl(ctx, in, out); 

    int outputSizeAfter = out.size(); 
    int readableBytesAfter = in.readableBytes(); 

    boolean didNotDecodeAnything = outputSizeBefore == outputSizeAfter; 
    boolean didNotReadAnything = readableBytesBefore == readableBytesAfter; 

    if(didNotDecodeAnything && didNotReadAnything) { 
     break; 
    } 

    // next iteration, continue with decoding 
} 

所以,你的解码器将继续,直到输入缓冲耗尽读头。

为了得到你想要的行为,您必须将isSingleDecode标志设置为true:

class MyDecoder extends ByteToMessageDecoder { 

    MyDecoder() { 
     setSingleDecode(true); 
    } 

    // your decode impl as before 
} 

MyDecoder decoder = new MyDecoder(); 
decoder.setSingleDecode(true); 

这将导致循环停止后您的解码实现解码东西。 现在您的LengthFieldBasedFrameDecoder将与您加入out列表中的ByteBuf一起调用。 帧解码按照您所描述的方式工作,无需将副本添加到列表中。 你的SimpleChannelInboundHandler将被称为有效载荷帧作为msg

但是,您将无法读取来自AttributeMap的头在你的SimpleChannelInboundHandler 因为ChannelHandlerContext是不同的,每个通道一个处理器中,atrributes不共享。

解决此问题的一种方法是为此使用事件。 在你decoder,而不是添加的HeaderAttributeMap,把它作为一个事件:

// instead of 
// ctx.attr(Header.ATTRIBUTE_KEY).getAndSet(header); 
// do this 
ctx.fireUserEventTriggered(ChannelAttributes.HEADER); 

然后,写你的SimpleChannelInboundHandler像这样

class MyMessageHandler extends SimpleChannelInboundHandler<ByteBuf> { 

    private Header header = null; 

    MyMessageHandler() { 
      super(true); 
    } 

    @Override 
    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception { 
     if (evt instanceof Header) { 
      header = (Header) evt; 
     } else { 
      super.userEventTriggered(ctx, evt); 
     } 
    } 

    @Override 
    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception { 
     if (header != null) { 
      System.out.println("header = " + header); 
      // continue with header, such as routing... 
     } 
     header = null; 
    } 
} 

另一种方法是,以这两个对象发下来管道和使用 ChannelInboundHandlerAdapter而不是SimpleChannelInboundHandler。 在你decoder,而不是添加的HeaderAttributeMap,将它添加到out

// ... 
out.add(header); 
out.add(in); 

然后,写你的ChannelInboundHandler像这样

class MyMessageHandler extends ChannelInboundHandlerAdapter { 
    private Header header = null; 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { 
     if (msg instanceof Header) { 
      header = (Header) msg; 
      System.out.println("got the header " + header); 
     } else if (msg instanceof ByteBuf) { 
      ByteBuf byteBuf = (ByteBuf) msg; 
      System.out.println("got the message " + msg); 
      try { 
       // continue with header, such as routing... 
      } finally { 
       ReferenceCountUtil.release(msg); 
      } 
     } else { 
      super.channelRead(ctx, msg); 
     } 
    } 
} 

LengthFieldBasedFrameDecoder简单地忽略消息,是不是ByteBuf s, 所以您的标题只会通过它(因为它不实施ByteBuf)和 到达您的ChannelInboundHandler。然后,该消息将被解码为 有效载荷帧并传递给您的ChannelInboundHandler

+1

谢谢! 'setSingleDecode(true)'正是我所需要的。但是我似乎无法在实现'ByteToMessageCodec'时找到方法,只有'ByteToMessageDecoder'。无论如何,将分解编码和解码过程。并感谢有关将'header'传递给下一个处理程序的提示。然而,我通过执行'ctx.channel()。attr(ChannelAttributes.HEADER).set(header)'来设置和检索'header',并将它添加到频道的'AttributeMap'中。这样做有什么缺点? – Ian2thedv 2014-10-29 07:33:05

+1

啊,是的,'setSingleDecode'只存在于'ByteToMessageDecoder'上。至于频道上的属性地图,我没有看到任何缺点,这当然是一个可行的解决方案。 – knutwalker 2014-10-29 08:56:32