感谢Martijn Pieters answer我停止了围绕python请求行为的工作,并寻找一种完全不同的方法。
我结束了使用pyCurl。您可以像使用Tornado等一样使用它,类似于select + recv循环,无需反转控制流并放弃对专用IO循环的控制。这种方式很容易使用发电机,一旦它们到达时就会产生新的线路 - 中间层无需进一步缓冲,这可能会引入运行IO环路的延迟或附加线程。
同时,它足够高级,您不需要担心分块传输编码,SSL加密或gzip压缩。
这是我的旧代码,其中chunk_size
= 1导致45%的CPU负载,并且引入了额外的延迟。
import requests
class RequestsHTTPStream(object):
def __init__(self, url):
self.url = url
def iter_lines(self):
headers = {'Cache-Control':'no-cache',
'Accept': 'text/event-stream',
'Accept-Encoding': 'gzip'}
response = requests.get(self.url, stream=True, headers=headers)
return response.iter_lines(chunk_size=1)
这里是基于pyCurl我的新代码: (不幸的是,curl_easy_ *风格perform
块完全,这使得它很难产生在线路之间不使用线程因此我使用curl_multi_ *方法。 )
import pycurl
import urllib2
import httplib
import StringIO
class CurlHTTPStream(object):
def __init__(self, url):
self.url = url
self.received_buffer = StringIO.StringIO()
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)
self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)
self.status_code = 0
SELECT_TIMEOUT = 10
def _any_data_received(self):
return self.received_buffer.tell() != 0
def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result
def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != httplib.OK:
raise urllib2.HTTPError(self.url, self.status_code, None, None, None)
def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles
def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)
self._check_status_code()
self._check_curl_errors()
def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])
def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)
@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)
pending = None
for chunk in chunks:
if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()
if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
for line in lines:
yield line
if pending is not None:
yield pending
此代码试图如果只有几个到从传入的流取的字节数可能的,而不会不必要地阻塞。相比之下,CPU负载大约为0.2%
*你知道一个好的框架/ libraray的任务?*图书馆的请求是离题,我害怕。 –
对图书馆的要求抱歉。正如所写,我目前正在使用python-requests,并希望继续使用它。所以我的问题主要是如何用python-requests做些事情。但是:如果没有办法,我完全可以使用另一个库。 –
寻找线条是可能的。当我在一个GZipped Response Stream中刷新Tornado中的响应时,底层压缩zlib支持使用的刷新(Z_SYNC_FLUSH)。所以,http流非常好,分成完美的部分,其中完整的压缩线。仅仅用Python阅读它们是很困难的。 –