2011-10-02 36 views
0

我正在实现一个Ruby中的小型节俭(0.6.0)服务器,以便向具有多个连接(多个客户端)的另一个协议的代理角色发送到单个服务器。我希望能够在服务器端保持每个客户端的数据,并跟踪处理函数的多个调用中的“会话”参数。在节俭处理程序函数中获取对等地址

我目前使用Thrift::NonblockingServer作为SimpleServer似乎不允许并发连接。

我知道如何使用TCPSocket::peeraddrThrift::NonblockingServer::IOManager::Worker::run它读取帧创建一个临时MemoryBufferTransport并传递作为输入/输出协议下到处理器,使得它似乎是信息不从那里传下来的。

有没有一个干净的方法来做到这一点? 我正在考虑重新定义上面提到的Thrift::NonblockingServer::IOManager::Worker::run以在其他参数中包含fd或其他ID以处理或增强原始实例,但是因为我还必须担心生成的一个红宝石代码层(process_*方法class Processor)似乎有点沉重。

我不知道以前是否有人做过这样的事情。

谢谢!

p.s.这是类似的问题this C++ thrift question

回答

0

这是我如何去改变Thrift::NonblockingServer::IOManager::Worker::run来支持这一点。

几个注意事项:

  • 正如我提到的问题,我不认为这是干净的(如果不出意外,我将不得不监测未来节俭版本在这个功能的改变,这是基于0.6。 0)。
  • 这是写入与红宝石1.8工作(或我会得到非翻译addr /端口see my other question
  • 我是一个红宝石新手..我敢肯定我已经做了一些这种“错误的” (例如,$connections@@connectionsConnEntry或差异类?)。
  • 我知道,Thread.current哈希线程本地存储具有a namespace pollution issue

首先,在一些核心模块:

module MyThriftExt 

    $connections={} 

    class ConnEntry 
    attr_reader :addr_info, :attr 

    def initialize(thrift_fd) 
     @addr_info=thrift_fd.handle.peeraddr 
     @attr={} 
    end 

    def self.PreHandle(fd) 
     $connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry 
     # make the connection entry as short-term thread-local variable 
     # (cleared in postHandle) 
     Thread.current[:connEntry]=$connections[fd] 
    end 

    def self.PostHandle() 
     Thread.current[:connEntry]=nil 
    end 

    def to_s() 
     "#{addr_info}" 
    end end end 


module Thrift class NonblockingServer 
    class IOManager 
     alias :old_remove_connection :remove_connection 
     def remove_connection(fd) 
     $connections.delete fd 
     old_remove_connection(fd) 
     end 

     class Worker 
     # The following is verbatim from thrift 0.6.0 except for the two lines 
     # marked with "Added" 
     def run 
      loop do 
      cmd, *args = @queue.pop 
      case cmd 
      when :shutdown 
       @logger.debug "#{self} is shutting down, goodbye" 
       break 
      when :frame 
       fd, frame = args 
       begin 
       otrans = @transport_factory.get_transport(fd) 
       oprot = @protocol_factory.get_protocol(otrans) 
       membuf = MemoryBufferTransport.new(frame) 
       itrans = @transport_factory.get_transport(membuf) 
       iprot = @protocol_factory.get_protocol(itrans) 
       MyThriftExt::ConnEntry.PreHandle(fd) # <<== Added 
       @processor.process(iprot, oprot) 
       MyThriftExt::ConnEntry.PostHandle  # <<== Added 
       rescue => e 
       @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}" 
       end 
      end 
      end 
     end 
     end 
    end 
    end 
end 

然后在处理任何时候,你可以连接访问Thread.current[:connEntry].addr_info具体数据或存储任何关于Thread.current[:connEntry].attr哈希中的连接。