2011-06-09 58 views
0

我在kombu有两个队列;一个提交请求(执行某些操作),另一个通过pub/sub提取所述请求的增量状态。因此在我的过程中它将发布到请求队列并在响应队列中消耗。由于任务可能需要一些时间,因此我想向用户提供关于后端中发生的事情的反馈;它们都工作在命令行中,我的海带consume回调允许我,说,加logging.info()语句信息吐回我的用户:但是django和kombu的流视图

def callback(msg, env): 
    logging.info(str(msg)) 

consumer.register_callback(callback) 
consumer.consume() 
while continue_consuming: 
    connection.drain_events() 

,我现在希望能够提供相同的在Django的功能。我知道我可以创建一个generator功能输入到HttpResponse对象:

def view(reqeust): 
    HttpResponse(gen()) 

def gen(): 
    yield 'streaming... ' 

,但我不能概念化,我怎么能实现海带队列中的消息回调到一个发电机来提供这个...任何想法?

我想避免使用数据库层来存储进度/结果,如果可能的话。

回答

0

最后,我决定重新编码一下;因为我有一个kombu队列周围的包装,使界面更像multiprocess.Queue,我为我的get()方法创建了一个生成器。

def get(self, until=None): 
    if until == None: 
     until = self.end_marker 
    for c in count(): 
     m = self.consumer.queues[0].get(True) 
     if not m == None: 
      if m.payload == until: 
       raise StopIteration 
      yield m.payload 

这似乎很好地工作 - 但不是所有的清洁,因为我有必要知道什么self.end_markerutil是,也可能希望通过所有消费者队列迭代(但我的课是每个对象队列反正,所以这不是太糟糕了)

那么我在我看来,做的是:

def view(response): 
    q = Queue() 
    return HttpResponse(q.get()) 

有关于各种中间件的方式获得大量的职位;我只是不打扰使用它们,它似乎工作正常。