2015-04-05 67 views
3

我想利用Python的新asyncio库发送异步HTTP请求。我希望在发送每个请求之前等待几个毫秒(timeout变量) - 但是当然 - 将它们全部异步发送,而不是在发送每个请求之后等待响应。Python的asyncio同步工作

我做类似如下:

@asyncio.coroutine 
def handle_line(self, line, destination): 
    print("Inside! line {} destination {}".format(line, destination)) 
    response = yield from aiohttp.request('POST', destination, data=line, 
           headers=tester.headers) 
    print(response.status) 
    return (yield from response.read()) 

@asyncio.coroutine 
def send_data(self, filename, timeout): 
    destination='foo' 
    logging.log(logging.DEBUG, 'sending_data') 
    with open(filename) as log_file: 
     for line in log_file: 
      try: 
       json_event = json.loads(line) 
      except ValueError as e: 
       print("Error parsing json event") 
      time.sleep(timeout) 
      yield from asyncio.async(self.handle_line(json.dumps(json_event), destination)) 


loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1)) 

,我获得(通过打印200级响应)是这样的代码是同步运行的输出。我究竟做错了什么?

回答

5

有几个问题在这里:

  1. 你应该使用asyncio.sleep,不time.sleep,因为后者将阻止事件循环。

  2. 你不应该使用yield fromasyncio.async(self.handle_line(...))电话后,因为那样会使脚本块,直到self.handle_line协程完成,这意味着你最终不会同时做任何事情;你处理每一行,等待处理完成,然后继续下一行。相反,您应该在不等待的情况下运行所有​​asyncio.async调用,将Task对象返回到列表中,然后使用asyncio.wait等待它们全部完成后,全部启动它们。

把所有在一起:

@asyncio.coroutine 
def handle_line(self, line, destination): 
    print("Inside! line {} destination {}".format(line, destination)) 
    response = yield from aiohttp.request('POST', destination, data=line, 
           headers=tester.headers) 
    print(response.status) 
    return (yield from response.read()) 

@asyncio.coroutine 
def send_data(self, filename, timeout): 
    destination='foo' 
    logging.log(logging.DEBUG, 'sending_data') 
    tasks = [] 
    with open(filename) as log_file: 
     for line in log_file: 
      try: 
       json_event = json.loads(line) 
      except ValueError as e: 
       print("Error parsing json event") 
      yield from asyncio.sleep(timeout) 
      tasks.append(asyncio.async(
       self.handle_line(json.dumps(json_event), destination)) 
    yield from asyncio.wait(tasks) 


asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1)) 
+0

嗯...... 谢谢。 但我不确定这正是我想要的。 asyncio.sleep的收益将在将任务附加到任务列表之前等待超时。 asyncio.wait的收益率将同时发送所有请求。 我希望它们能够在每个请求之间的超时间隔之后一个接一个地发送。该代码不会在每个请求之间等待。它会一次发送它们。 – OhadBasan 2015-04-06 14:26:46

+1

@OhadBasan asyncio.sleep(timeout)行的'yield将使得代码在启动每个任务之间等待'timeout'秒,而不仅仅是在将它附加到任务列表之间。这个任务实际上被添加到事件循环中,并且在您调用'asyncio.async(task())'时立即启动。你不需要从任务中“放弃”它开始执行。 – dano 2015-04-06 14:29:21

+2

@OhadBasan它可以帮助我们认识到从'产出',而不是启动一个协程,而是等待它的完成。 – 2015-06-03 00:30:37