2017-06-04 103 views
0

我试图写一个龙卷风服务器的简单工作负载生成,这里是它的简化版本:如何让龙卷风执行并发代码?

class EventsLoader(object): 

    generate_num_requests = 1000 
    generate_concurrency = 32 
    server_port = 8001 

    def __init__(self, conf_file): 
     self.parse_config(conf_file) 
     self.client = AsyncHTTPClient() 

    def generate(self): 
     IOLoop.current().run_sync(self.generate_work) 

    @gen.coroutine 
    def generate_work(self): 
     self.queue = queues.Queue() 
     IOLoop.current().spawn_callback(self.fetch_requests) 
     for i in range(self.generate_concurrency): 
      yield self.generate_requests(i) 
     print 'before join queue size: %s' % self.queue.qsize() 
     yield self.queue.join() 

    @gen.coroutine 
    def generate_requests(self, i): 
     load = self.generate_num_requests/self.generate_concurrency 
     for j in range(load): 
      request = self.generate_request(i * 1000 + j) 
      self.queue.put(request) 

    @gen.coroutine 
    def fetch_requests(self): 
     while True: 
      try: 
       request = yield self.queue.get() 
       yield self.client.fetch(request) 
      except Exception as e: 
       print 'failed fetching: %s: %s' % (request.body, e) 
      finally: 
       print 'fetched: %s' % json.loads(request.body)['seq'] 
       self.queue.task_done() 

    def generate_request(self, seq): 
     event = { 
      'seq': seq, 
      # ... more fields here ... 
     } 
     return HTTPRequest(
      'http://localhost:%s/events' % self.server_port, 
      method='POST', 
      body=json.dumps(event), 
     ) 

我看到发生的是,所有的消息fetched: xxxx出现的顺序,这是绝对不可能的,如果发电机的确在同时工作。

如何让它同时运行?在我了解I/O循环是什么以及什么是@gen.coroutine时,肯定会有一些巨大的缺失。即不管我的generate_concurrency设置如何,性能不变。

+0

[与龙卷风Python的异步函数调用]的可能的复制(HTTPS运行它们/stackoverflow.com/questions/44139848/async-function-call-with-tornado-python) –

回答

1

无论您如何生成请求,您都只能抓取任务。这是取,不发电,你需要并行:

for i in range(self.fetch_concurrency): 
    IOLoop.current().spawn_callback(self.fetch_requests) 

这会给你多fetch_requests工人可以从共享队列中提取工作。

另外,这段代码的生成部分也没有并行运行。 /:与其

for i in range(self.generate_concurrency): 
     yield self.generate_requests(i) 

它等待一个generate_requests呼叫开始下一个之前完成,则可以并行地

yield [self.generate_requests(i) for i in range(self.generate_concurrency)] 
+0

谢谢,这确实有效,但是,如果你不介意,你能解释为什么'yield' generate_requests'不起作用?不应该让控制权返回到龙卷风下一个'generate_request',而不必等待前一个完成?此外,随着您的更改应用,“i”按顺序生成,但“j”按“随机”(预期)顺序生成。 – wvxvw

+0

其实,让我猜。 'generate_requests'不执行任何I/O操作,并且它是否可以控制或不受控制,Tornado无法利用这一点?或者,也许龙卷风在计划执行的所有协程之间查找潜在候选的间隔非常大,所以'generate_requests'只是在Tornado甚至考虑暂停之前完成。 – wvxvw

+0

是的,Tornado基于协作式多任务处理,'generate_requests'只有在包含等待I/O的yield表达式时才能并行化。但即使这样做,每次迭代时最初的'yield_requests'循环都会指示Tornado等待每个循环完成,然后再转到下一个循环。 –