2017-03-07 74 views
0

我正在探索一种将流量从网络代理复制到两台服务器的方法。 即代替一般执行:Netty代理复制流量

服务器1 - >代理 - >服务器2

我想做到以下几点:

服务器1 - >代理 - >服务器2和服务器3

Server3->代理被丢弃

因此,每个消息被发送到这两个服务器2和服务器3

我只有一个约束代理和服务器2之间的通信不应该被阻止,因为服务器3的(如果服务器3是慢等) 。

我从下面的代码开始:https://github.com/dawnbreaks/TcpProxy

不幸的是,我不是太熟悉,网状,但执行似乎对我的目的非常理想。我想了解:

  1. 如何创建服务器3
  2. 哪些API来覆盖通信服务器3
  3. 一个新的渠道如何阅读和服务器丢弃邮件3

回答

3

在IRC#netty中看到您的聊天。

这里有几件事。您的代理服务器需要有服务器端1连接到的服务器端。然后服务器2和服务器3需要除了来自代理的连接,或者您可以使用UDP(取决于)从代理接收数据。

Netty有一个代理服务器的例子。这对你的情况起作用,对于第三部分来说很容易。简单地说,你可以使用现有的例子,并打开一个新的连接,将服务器3。现在你可以做的是从代理服务器(客户端连接到服务器2和3)两个渠道把他们放在一个渠道组,并写一个时间到两台服务器!我的编辑示例代码将...允许通过代理相互通信从服务器1到服务器2,并允许相互通话,而服务器3只能接收数据,但如果服务器3回复代理,则代理不会执行任何操作。您可能需要添加一个处理程序来释放缓冲区或处理不应该从服务器3回写的数据。从这里开始,您应该可以开始使用netty文件,api,示例和ppts,它们非常有用!

我会附上一些修改后的代码来显示你,这里是一个链接到这些例子。

Netty Proxy Server Examples

所以对于例如,你可以编辑HexDumpProxyFrontendHandler.class,只是添加第二个引导新客户端服务器3。

目前代码

41  @Override 
42  public void channelActive(ChannelHandlerContext ctx) { 
43   final Channel inboundChannel = ctx.channel(); 
44 
45   // Start the connection attempt. 
46   Bootstrap b = new Bootstrap(); 
47   b.group(inboundChannel.eventLoop()) 
48   .channel(ctx.channel().getClass()) 
49   .handler(new HexDumpProxyBackendHandler(inboundChannel)) 
50   .option(ChannelOption.AUTO_READ, false); 
51   ChannelFuture f = b.connect(remoteHost, remotePort); 
52   outboundChannel = f.channel(); 
53   f.addListener(new ChannelFutureListener() { 
54    @Override 
55    public void operationComplete(ChannelFuture future) { 
56     if (future.isSuccess()) { 
57      // connection complete start to read first data 
58      inboundChannel.read(); 
59     } else { 
60      // Close the connection if the connection attempt has failed. 
61      inboundChannel.close(); 
62     } 
63    } 
64   }); 
65  } 

编辑代码

import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelOption; 

/* 
* Copyright 2012 The Netty Project 
* 
* The Netty Project licenses this file to you under the Apache License, 
* version 2.0 (the "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at: 
* 
* http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
* License for the specific language governing permissions and limitations 
* under the License. 
*/ 
package io.netty.example.proxy; 

import io.netty.buffer.Unpooled; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.channel.group.ChannelGroup; 
import io.netty.channel.group.DefaultChannelGroup; 
import io.netty.util.concurrent.GlobalEventExecutor; 

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter { 

    private final String remoteHost; 
    private final int remotePort; 

    // As we use inboundChannel.eventLoop() when buildling the Bootstrap this does not need to be volatile as 
    // the server2OutboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel. 
    private Channel server2OutboundChannel; 
    private Channel server3OutboundChannel; 

    // TODO You should change this to your own executor 
    private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) { 
     this.remoteHost = remoteHost; 
     this.remotePort = remotePort; 
    } 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     final Channel inboundChannel = ctx.channel(); 

     // Start the connection attempt to SERVER 3 
     Bootstrap server3Bootstrap = new Bootstrap(); 
     server3Bootstrap.group(inboundChannel.eventLoop()) 
       .channel(ctx.channel().getClass()) 
       // You are only writing traffic to server 3 so you do not need to have a handler for the inbound traffic 
       .handler(new DiscardServerHandler()) // EDIT 
       .option(ChannelOption.AUTO_READ, false); 
     ChannelFuture server3Future = server3Bootstrap.connect(remoteHost, remotePort); 
     server3OutboundChannel = server3Future.channel(); 


     // Start the connection attempt to SERVER 2 
     Bootstrap server2Bootstrap = new Bootstrap(); 
     server2Bootstrap.group(inboundChannel.eventLoop()) 
       .channel(ctx.channel().getClass()) 
       .handler(new HexDumpProxyBackendHandler(inboundChannel)) 
       .option(ChannelOption.AUTO_READ, false); 
     ChannelFuture server2Future = server2Bootstrap.connect(remoteHost, remotePort); 
     server2OutboundChannel = server2Future.channel(); 
     server2Future.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) { 
       if (future.isSuccess()) { 
        // connection complete start to read first data 
        inboundChannel.read(); 
       } else { 
        // Close the connection if the connection attempt has failed. 
        inboundChannel.close(); 
       } 
      } 
     }); 

     // Here we are going to add channels to channel group to save bytebuf work 
     channels.add(server2OutboundChannel); 
     channels.add(server3OutboundChannel); 
    } 

    // You can keep this the same below or use the commented out section 
    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) { 
     // You need to reference count the message +1 
     msg.retain(); 
     if (server2OutboundChannel.isActive()) { 
      server2OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture future) { 
        if (future.isSuccess()) { 
         // was able to flush out data, start to read the next chunk 
         ctx.channel().read(); 
        } else { 
         future.channel().close(); 
        } 
       } 
      }); 
     } 
     if (server3OutboundChannel.isActive()) { 
      server3OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture future) { 
        if (future.isSuccess()) { 
         // was able to flush out data, start to read the next chunk 
         ctx.channel().read(); 
        } else { 
         future.channel().close(); 
        } 
       } 
      }); 
     } 


     // Optional to the above code instead channel writing automatically cares for reference counting for you 
//  channels.writeAndFlush(msg).addListeners(new ChannelFutureListener() { 
// 
//   @Override 
//   public void operationComplete(ChannelFuture future) throws Exception { 
//    if (future.isSuccess()) { 
//     // was able to flush out data, start to read the next chunk 
//     ctx.channel().read(); 
//    } else { 
//     future.channel().close(); 
//    } 
//   } 
//  }); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     if (server2OutboundChannel != null) { 
      closeOnFlush(server2OutboundChannel); 
     } 
     if (server3OutboundChannel != null) { 
      closeOnFlush(server3OutboundChannel); 
     } 


     // Optionally can do this 
//  channels.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     cause.printStackTrace(); 
     closeOnFlush(ctx.channel()); 
    } 

    /** 
    * Closes the specified channel after all queued write requests are flushed. 
    */ 
    static void closeOnFlush(Channel ch) { 
     if (ch.isActive()) { 
      ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 
     } 
    } 
} 

丢弃处理程序

这可以作为处理程序添加到服务器3,以丢弃由服务器3写入代理的任何内容。默认情况下,SimpleInboundHandlers在通过递减引用计数来处理它们后将丢弃消息。

Discard Handler Code

+0

非常感谢您的回复,我将在这一点,并尽快给您... – tsar2512

+0

自动执行此操作确保如果服务器3被阻塞,或有放缓也不会影响或减慢服务器1 - >服务器2之间的通信? – tsar2512

+0

除了服务器2之外,唯一的附加负载是数据将套接字留给服务器3.使用通道组进行内部处理不应该对性能产生太大影响。服务器3仍然可以在技术上写入代理,但数据不会被处理。你可以插入一个丢弃处理程序。编辑答案将代理服务器3上连接的丢弃处理程序包括在内,现在,如果服务器3写入代理服务器,则所有数据都将被丢弃。 – Underbalanced