2014-02-15 18 views
7

我写了一个HTTP服务器,它产生由JSON结构事件组成的无尽HTTP流。类似于Twitter的流媒体API。这些事件由\n(根据Server-sent events与Content-Type:文本/事件流)分隔,并且可能会有所不同。当他们到达时,从压缩的,分块的HTTP流中有效地读取行

的响应是

  • 分块(HTTP 1.1传输编码:分块)由于层出不穷
  • 压缩(内容编码:gzip),以节省带宽。

我希望在Python到达时尽快使用这些行,并尽可能节约资源,而不用重新发明轮子。

由于我目前正在使用python-requests,你知道如何使它工作吗? 如果你认为,python-requests在这里没有帮助,我完全开放其他框架/库。

我目前的实施是基于requests并使用iter_lines(...)接收行。但chunk_size参数很棘手。如果设置为1它非常强烈,因为某些事件可能是几千字节。如果设置为大于1的任何值,则有些事件会卡住,直到下一次到达,并且整个缓冲区“已满”。事件之间的时间可能会持续几秒钟。 我预计chunk_size是某种“接收的最大字节数”,如在unix的recv(...)中。相应的手册页说:

的接听电话正常返回的任何数据可用,直到 请求的数量,而不是等待接收请求全额 的。

但这显然不是它在请求库中的工作方式。他们或多或少地使用它作为“准确的接收字节数”。 在查看他们的源代码时,我无法确定哪个部分负责。也许httplib的Response或ssl的SSLSocket。

作为一种解决方法,我尝试将服务器上的行填充到块大小的倍数。 但是请求库中的块大小用于从压缩的响应流中获取字节。 所以这不会工作,直到我可以填充我的行,以便他们的压缩字节序列是块大小的倍数。但是这似乎太过分了。

我读过Twisted可以用于客户端上的HTTP流的非阻塞,非缓冲处理,但我只找到在服务器上创建流响应的代码。

+0

*你知道一个好的框架/ libraray的任务?*图书馆的请求是离题,我害怕。 –

+0

对图书馆的要求抱歉。正如所写,我目前正在使用python-requests,并希望继续使用它。所以我的问题主要是如何用python-requests做些事情。但是:如果没有办法,我完全可以使用另一个库。 –

+0

寻找线条是可能的。当我在一个GZipped Response Stream中刷新Tornado中的响应时,底层压缩zlib支持使用的刷新(Z_SYNC_FLUSH)。所以,http流非常好,分成完美的部分,其中完整的压缩线。仅仅用Python阅读它们是很困难的。 –

回答

8

感谢Martijn Pieters answer我停止了围绕python请求行为的工作,并寻找一种完全不同的方法。

我结束了使用pyCurl。您可以像使用Tornado等一样使用它,类似于select + recv循环,无需反转控制流并放弃对专用IO循环的控制。这种方式很容易使用发电机,一旦它们到达时就会产生新的线路 - 中间层无需进一步缓冲,这可能会引入运行IO环路的延迟或附加线程。

同时,它足够高级,您不需要担心分块传输编码,SSL加密或gzip压缩。

这是我的旧代码,其中chunk_size = 1导致45%的CPU负载,并且引入了额外的延迟。

import requests 
class RequestsHTTPStream(object): 
    def __init__(self, url): 
     self.url = url 

    def iter_lines(self): 
     headers = {'Cache-Control':'no-cache', 
        'Accept': 'text/event-stream', 
        'Accept-Encoding': 'gzip'} 
     response = requests.get(self.url, stream=True, headers=headers) 
     return response.iter_lines(chunk_size=1) 

这里是基于pyCurl我的新代码: (不幸的是,curl_easy_ *风格perform块完全,这使得它很难产生在线路之间不使用线程因此我使用curl_multi_ *方法。 )

import pycurl 
import urllib2 
import httplib 
import StringIO 

class CurlHTTPStream(object): 
    def __init__(self, url): 
     self.url = url 
     self.received_buffer = StringIO.StringIO() 

     self.curl = pycurl.Curl() 
     self.curl.setopt(pycurl.URL, url) 
     self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream']) 
     self.curl.setopt(pycurl.ENCODING, 'gzip') 
     self.curl.setopt(pycurl.CONNECTTIMEOUT, 5) 
     self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write) 

     self.curlmulti = pycurl.CurlMulti() 
     self.curlmulti.add_handle(self.curl) 

     self.status_code = 0 

    SELECT_TIMEOUT = 10 

    def _any_data_received(self): 
     return self.received_buffer.tell() != 0 

    def _get_received_data(self): 
     result = self.received_buffer.getvalue() 
     self.received_buffer.truncate(0) 
     self.received_buffer.seek(0) 
     return result 

    def _check_status_code(self): 
     if self.status_code == 0: 
      self.status_code = self.curl.getinfo(pycurl.HTTP_CODE) 
     if self.status_code != 0 and self.status_code != httplib.OK: 
      raise urllib2.HTTPError(self.url, self.status_code, None, None, None) 

    def _perform_on_curl(self): 
     while True: 
      ret, num_handles = self.curlmulti.perform() 
      if ret != pycurl.E_CALL_MULTI_PERFORM: 
       break 
     return num_handles 

    def _iter_chunks(self): 
     while True: 
      remaining = self._perform_on_curl() 
      if self._any_data_received(): 
       self._check_status_code() 
       yield self._get_received_data() 
      if remaining == 0: 
       break 
      self.curlmulti.select(self.SELECT_TIMEOUT) 

     self._check_status_code() 
     self._check_curl_errors() 

    def _check_curl_errors(self): 
     for f in self.curlmulti.info_read()[2]: 
      raise pycurl.error(*f[1:]) 

    def iter_lines(self): 
     chunks = self._iter_chunks() 
     return self._split_lines_from_chunks(chunks) 

    @staticmethod 
    def _split_lines_from_chunks(chunks): 
     #same behaviour as requests' Response.iter_lines(...) 

     pending = None 
     for chunk in chunks: 

      if pending is not None: 
       chunk = pending + chunk 
      lines = chunk.splitlines() 

      if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: 
       pending = lines.pop() 
      else: 
       pending = None 

      for line in lines: 
       yield line 

     if pending is not None: 
      yield pending 

此代码试图如果只有几个到从传入的流取的字节数可能的,而不会不必要地阻塞。相比之下,CPU负载大约为0.2%

+0

这个'self.received_buffer.truncate(0)'不会导致数据丢失吗?我的意思是通过回调方法('self.received_buffer.write')写入'self.received_buffer'的数据,但尚未读取'self.received_buffer.getvalue()'? – Amit

+0

写入不会异步发生,WRITEFUNCTION只在perform()中调用。当你应该调用执行读/写数据时,select()表示不进行不必要的轮询。 –

+0

不错的工作!无论如何,你可以添加一个如何使用它的例子吗?展示如何将它与progressbar.ProgressBar一起使用,我们将不胜感激。 –

6

这不是requests'iter_lines()呼叫阻塞的错误。

Response.iter_lines()方法调用Response.iter_content(),它调用urllib3HTTPResponse.stream(),它调用HTTPResponse.read()

这些调用传递一个块大小,这是传递给套接字的那个,如self._fp.read(amt)。这是有问题的调用,因为self._fp是由socket.makefile()生成的文件对象(由httplib module完成);并且这个.read()调用块直到amt(压缩)字节被读取。

该底层套接字文件对象确实支持.readline()调用,该调用将更高效地工作,但urllib3在处理压缩数据时不能使用此调用;行结束符不会在压缩流中可见。

遗憾的是,urllib3在响应未被压缩时也不会调用self._fp.readline();调用的结构方式很难传递,你希望以行缓冲模式读取,而不是像块缓冲模式那样读取。

我必须说HTTP不是用于流式事件的最佳协议;我会为此使用不同的协议。 Websockets想到了,或者针对您的特定用例的自定义协议。

+0

感谢您指出,“问题”在请求使用的http堆栈中是深入的! –

+0

@ThomasB:是的,也没有我知道的图书馆会以一种干净的方式解决这个问题。压缩在这里没有帮助;你将不得不使用'urllib2',然后从响应对象访问原始套接字,执行非阻塞式读取并进行自己的解压缩。不漂亮。 –

+0

我在阅读[SSE vs WebSockets]后选择了SSE(http://stackoverflow.com/questions/5195452/websockets-vs-server-sent-events-eventsource)。我只需要下游,JS客户端超级简单,服务器超级简单,反向代理,加密,压缩在HTTP上正常工作。 –