2013-05-09 77 views
1

我正在从netty 3迁移到netty 4.我有一个管道处理程序,它充当经典过滤器,在路上拦截/处理不符合的消息,并在上游铲除符合要求的消息。在Netty 4中迁移sendUpstream

根据文档(http://netty.io/wiki/new-and-noteworthy.html),我预计将使用ctx.fireInboundBufferUpdated()代替ctx.sendUpStream()来中继入站。但是,我发现这不起作用,但ChannelHandlerUtil.addToNextInboundBuffer()呢。我喜欢一些指导,:

  1. 我在当前文档断言ctx.sendUpstream -> ctx.fireInboundBufferUpdated和,
  2. 什么是在这种情况下,最好的做法,如果比我做如下不同的混乱。

的代码:

//The pipeline 

public class ServerInitializer extends ChannelInitializer<SocketChannel> { 

@Override 
public void initChannel(SocketChannel ch) throws Exception { 
    ChannelPipeline p = ch.pipeline(); 
    p.addLast("decoder", new HttpRequestDecoder()); 
    p.addLast("encoder", new HttpResponseEncoder()); 
    p.addLast("inbound", InboundHttpRequestFilter.INSTANCE); 
    p.addLast("handler", handlerClass.newInstance()); 

} 
} 

//The filter 
public class InboundHttpRequestFilter extends 
     ChannelInboundMessageHandlerAdapter<Object> { 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, Object msg) 
      throws Exception { 
     ... discard/handle as necessary …; 
     //ctx.fireInboundBufferUpdated(); - doesn't propagate upstream 
     ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); // sends upstream 
    } 
} 

回答

1

尝试这种情况:

ctx.nextInboundMessageBuffer().add(msg) 

的Javadoc:

Interface ChannelHandlerContext 
MessageBuf<Object> nextInboundMessageBuffer() 
    Return the MessageBuf of the next ChannelInboundMessageHandler in the pipeline. 

的Netty 4多处理器实施例:

MultiHandlerServer.java

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.LineBasedFrameDecoder; 
import io.netty.handler.codec.string.StringDecoder; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.nio.charset.Charset; 

public class MultiHandlerServer { 
    private static final Logger logger = LoggerFactory.getLogger(MultiHandlerServer.class); 

    final int port; 

    public MultiHandlerServer(final int port) { 
     this.port = port; 
    } 

    public void run() throws InterruptedException { 
     final NioEventLoopGroup bossGroup = new NioEventLoopGroup(); 
     final NioEventLoopGroup workerGroup = new NioEventLoopGroup(); 
     try { 

      final ServerBootstrap serverBootstrap = new ServerBootstrap() 
        .group(bossGroup, workerGroup) 
        .channel(NioServerSocketChannel.class) 
        .childHandler(new ChannelInitializer<SocketChannel>() { 
         @Override 
         protected void initChannel(SocketChannel ch) throws Exception { 
          ch.pipeline().addLast(
            new LineBasedFrameDecoder(8192), 
            new StringDecoder(Charset.forName("UTF-8")), 
            new MultiHandler01(), new MultiHandler02()); 
         } 
        }); 

      final ChannelFuture future = serverBootstrap.bind(port).sync(); 
      future.channel().closeFuture().sync(); 
     } finally { 
      bossGroup.shutdownGracefully(); 
      workerGroup.shutdownGracefully(); 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     final MultiHandlerServer client = new MultiHandlerServer(8080); 
     client.run(); 
    } 
} 

MultiHandler01.java

import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundMessageHandlerAdapter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
*/ 
class MultiHandler01 extends ChannelInboundMessageHandlerAdapter<String> { 
    private Logger logger = LoggerFactory.getLogger(MultiHandler01.class); 

    MultiHandler01() { 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { 
     logger.info(String.format("Handler01 receive message: %s", msg)); 
     ctx.nextInboundMessageBuffer().add(msg); 
     ctx.fireInboundBufferUpdated(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     logger.error("Exception caught: %s", ctx.channel().remoteAddress(), cause); 
     ctx.close(); 
    } 
} 

MultiHandler02.java

import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundMessageHandlerAdapter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
*/ 
class MultiHandler02 extends ChannelInboundMessageHandlerAdapter<String> { 
    private Logger logger = LoggerFactory.getLogger(MultiHandler02.class); 

    MultiHandler02() { 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { 
     logger.info(String.format("Handler02 receive message: %s", msg)); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     logger.error("Exception caught: %s", ctx.channel().remoteAddress(), cause); 
     ctx.close(); 
    } 
} 
+0

谢谢,这工作与修改:'CTX。 nextInboundMessag eBuffer()。添加(请求)'。编辑后,我会鼓励:)。 – 2013-05-10 20:48:47

+0

非常感谢您的意见! :) – 2013-05-13 11:35:34