2011-03-01 110 views
2

我正在构建一个服务器,它使用Twisted Python在Redis顶部存储关键/值数据。 服务器通过HTTP接收JSON字典,将其转换为Python字典并放入缓冲区。每次存储新数据时,服务器都会调度一个任务,该任务从缓冲区中弹出一个字典,并使用txredis客户端将每个元组写入Redis实例。Twisted Python中的另一个生产者/消费者问题

class Datastore(Resource): 

isLeaf = True 

def __init__(self): 
    self.clientCreator = protocol.ClientCreator(reactor, Redis) 
    d = self.clientCreator.connectTCP(...) 
    d.addCallback(self.setRedis) 
    self.redis = None 
    self.buffer = deque() 


def render_POST(self, request): 
    try: 
     task_id = request.requestHeaders.getRawHeaders('x-task-id')[0] 
    except IndexError: 
     request.setResponseCode(503) 
     return '<html><body>Error reading task_id</body></html>' 

    data = json.loads(request.content.read()) 
    self.buffer.append((task_id, data)) 
    reactor.callLater(0, self.write_on_redis) 
    return ' ' 

@defer.inlineCallbacks 
def write_on_redis(self): 
    try: 
     task_id, dic = self.buffer.pop() 
     log.msg('Buffer: %s' % len(self.buffer)) 
    except IndexError: 
     log.msg('buffer empty') 
     defer.returnValue(1) 

    m = yield self.redis.sismember('DONE', task_id) 
    # Simple check 
    if m == '1': 
     log.msg('%s already stored' % task_id) 
    else: 
     log.msg('%s unpacking' % task_id) 
     s = yield self.redis.sadd('DONE', task_id) 

     d = defer.Deferred() 
     for k, v in dic.iteritems(): 
      k = k.encode() 
      d.addCallback(self.redis.push, k, v) 

     d.callback(None) 

基本上,我面对两种不同的连接之间的生产者/消费者问题,但我不知道,目前的实施效果很好的扭曲paradygm。 我已经阅读了关于Twisted中生产者/消费者接口的小文档,但我不确定我是否可以在我的情况下使用它们。 欢迎任何评论家:在线程并发多年后,我试图掌握事件驱动编程。

回答

2

Twisted生产者和消费者API,IProducerIConsumer,都是关于流量控制。您似乎没有任何流量控制,只是将消息从一种协议转发给另一种协议。

由于没有流量控制,缓冲区只是额外的复杂性。只需将数据直接传递给write_on_redis方法即可摆脱它。这种方式write_on_redis不需要处理空的缓冲区大小,你不需要额外的资源属性,甚至可以摆脱callLater(即使你保留缓冲区也可以做到这一点)。

虽然我不知道这是否回答你的问题。至于这种做法是否“效果很好”,这里是我发现的东西通过阅读代码:

  • 如果数据到达比redis的接受它快,你的优秀作业列表可能会变得任意大,导致你耗尽内存。这就是流量控制所能提供的帮助。
  • 没有错误围绕sismember来电或sadd呼叫处理,你可能会失去工作如果这些失败,因为你已经从工作缓冲区弹出他们。
  • 做一个推的回调对Deferredd也意味着,任何失败的推动将防止数据的其余部分被推动。它也经过push返回Deferred的结果(我假设它返回一个Deferred)作为第一个参数传递给下一个电话,所以除非push或多或少地忽略了它的第一个参数,你会不会推正确的数据到redis。

如果你想实现流量控制,那么你需要让你的H​​TTP服务器检查self.buffer长度和可能拒绝新的任务 - 将其添加到self.buffer并返回一些错误代码到客户端。你仍然不会使用IConsumerIProducer,但它有点类似。