2010-12-08 131 views
1

您好,我正在开发一个基于twisted的rpc服务器来服务几个微控制器,这些微控制器使rpc调用到扭曲的jsonrpc服务器。但是应用程序还要求服务器随时向每个微型计算机发送信息,所以问题在于如何防止来自微型计算机的远程jsonrpc调用的响应与服务器jsonrpc请求混淆一个用户。如何实现一个双向jsonrpc + twisted服务器/客户端

现在我得到的结果是微型计算机正在接收不良信息,因为他们不知道来自套接字的netstring/json字符串是来自先前需求的响应还是来自服务器的新请求。

这里是我的代码:

from twisted.internet import reactor 
from txjsonrpc.netstring import jsonrpc 
import weakref 

creds = {'user1':'pass1','user2':'pass2','user3':'pass3'} 

class arduinoRPC(jsonrpc.JSONRPC): 
    def connectionMade(self): 
     pass 

    def jsonrpc_identify(self,username,password,mac): 
     """ Each client must be authenticated just after to be connected calling this rpc """ 
     if creds.has_key(username): 
      if creds[username] == password: 
       authenticated = True 
      else: 
       authenticated = False 
     else: 
      authenticated = False 

     if authenticated: 
      self.factory.clients.append(self) 
      self.factory.references[mac] = weakref.ref(self) 
      return {'results':'Authenticated as %s'%username,'error':None} 
     else: 
      self.transport.loseConnection() 

    def jsonrpc_sync_acq(self,data,f): 
     """Save into django table data acquired from sensors and send ack to gateway""" 
     if not (self in self.factory.clients): 
      self.transport.loseConnection() 
     print f 
     return {'results':'synced %s records'%len(data),'error':'null'} 

    def connectionLost(self, reason): 
     """ mac address is searched and all reference to self.factory.clientes are erased """ 
     for mac in self.factory.references.keys(): 
      if self.factory.references[mac]() == self: 
       print 'Connection closed - Mac address: %s'%mac 
       del self.factory.references[mac] 
       self.factory.clients.remove(self) 


class rpcfactory(jsonrpc.RPCFactory): 
    protocol = arduinoRPC 
    def __init__(self, maxLength=1024): 
     self.maxLength = maxLength 
     self.subHandlers = {} 
     self.clients = [] 
     self.references = {} 

""" Asynchronous remote calling to micros, simulating random calling from server """ 
import threading,time,random,netstring,json 
class asyncGatewayCalls(threading.Thread): 
    def __init__(self,rpcfactory): 
     threading.Thread.__init__(self) 
     self.rpcfactory = rpcfactory 
     """identifiers of each micro/client connected""" 
     self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90'] 
    def run(self): 
     while True: 
      time.sleep(10) 
      while True: 
       """ call to any of three potential micros connected """ 
       mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))] 
       if self.rpcfactory.references.has_key(mac): 
        print 'Calling %s'%mac 
        proto = self.rpcfactory.references[mac]() 
        """ requesting echo from selected micro""" 
        dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']})) 
        proto.transport.write(dataToSend) 
        break 

factory = rpcfactory(arduinoRPC) 

"""start thread caller""" 
r=asyncGatewayCalls(factory) 
r.start() 

reactor.listenTCP(7080, factory) 
print "Micros remote RPC server started" 
reactor.run() 

回答

2

你需要足够的信息添加到每个邮件,以便收件人可以决定如何解释它。您的要求听起来与AMP非常相似,因此您可以使用AMP代替或使用与AMP相同的结构来识别您的消息。具体来说:

  • 在请求中,放置一个特定的密钥 - 例如,AMP使用“_ask”来标识请求。它还为这些提供了一个独特的价值,它进一步确定了连接生命周期的请求。
  • 在回应中,输入了一个不同的键 - 例如,AMP使用“_answer”表示这一点。该值与请求响应中的“_ask”键值相匹配。

使用这种方法,您只需查看是否有“_ask”键或“_answer”键来确定您是否收到了新请求或对先前请求的响应。

在另一个主题上,您的asyncGatewayCalls类不应该是基于线程的。它没有明显的理由使用线程,并且这样做也会滥用Twisted API,从而导致未定义的行为。大多数Twisted API只能用于您调用reactor.run的线程。唯一的例外是reactor.callFromThread,您可以使用它从任何其他线程向反应器线程发送消息。 asyncGatewayCalls尝试写入传输,但这会导致缓冲区损坏或发送数据的任意延迟,或者更糟糕的情况。取而代之的是,你可以写asyncGatewayCalls这样的:

from twisted.internet.task import LoopingCall 

class asyncGatewayCalls(object): 
    def __init__(self, rpcfactory): 
     self.rpcfactory = rpcfactory 
     self.remoteMacList = [...] 

    def run(): 
     self._call = LoopingCall(self._pokeMicro) 
     return self._call.start(10) 

    def _pokeMicro(self): 
     while True: 
      mac = self.remoteMacList[...] 
      if mac in self.rpcfactory.references: 
       proto = ... 
       dataToSend = ... 
       proto.transport.write(dataToSend) 
       break 

factory = ... 
r = asyncGatewayCalls(factory) 
r.run() 

reactor.listenTCP(7080, factory) 
reactor.run() 

这给了你应该有相同的行为,你打算为原asyncGatewayCalls类单线程解决方案。为了安排调用,它不是在线程中的循环中休眠,而是使用反应堆的调度API(通过更高级别的LoopingCall类,该类调度要重复调用的东西)以确保每10秒调用一次_pokeMicro

+0

是的你的权利,几小时前,我正在阅读线程API文档(task.LoopingCall)后得出同样的结论。我测试过,效果很好。感谢您的帮助 – Jaime 2010-12-10 11:27:48