2013-02-20 71 views
4

我是相当新的扭曲,我试图做一个异步客户端,获取一些网址,并将结果保存到每个网址的不同文件。当我用有限数量的服务器运行程序时,比如说10,反应器循环正确结束并且程序终止。但是当我用例如Alexa top 2500运行程序时,程序开始提取url,但是不会终止。我设置了超时时间,但它不起作用,我相信必须有一些开放的套接字,不会因为错误或成功而触发任何回调。我的目标是一旦程序获取了页面,或者每连接超时已经到期,程序必须终止并关闭所有活动的文件描述符。蟒蛇扭曲代理超时

对不起,但复制和粘贴时没有保留代码缩进,现在我已经检查并且已修复。该代码是举例说明的最低限度,请注意,我的问题是,当我启动具有大量站点爬行的程序时,反应器并未停止。

#!/usr/bin/env python 

from pprint import pformat 
from twisted.internet import reactor 
import twisted.internet.defer 
import sys 
from twisted.internet.protocol import Protocol 
from twisted.web.client import Agent 
from twisted.web.http_headers import Headers 

class PrinterClient(Protocol): 
    def __init__(self, whenFinished, output): 
     self.whenFinished = whenFinished 
     self.output = output 

    def dataReceived(self, bytes): 
     #print '##### Received #####\n%s' % (bytes,) 
     self.output.write('%s' % (bytes,)) 

    def connectionLost(self, reason): 
     print 'Finished:', reason.getErrorMessage() 
     self.output.write('Finished: %s \n'%(reason.getErrorMessage())) 
     self.output.write('#########end########%s\n'%(reason.getErrorMessage())) 
     self.whenFinished.callback(None) 

def handleResponse(r, output, url): 
    output.write('############start############\n') 
    output.write('%s\n'%(url)) 
    #print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase) 
    output.write("version=%s\ncode=%s\nphrase='%s'"\ 
      %(r.version, r.code, r.phrase)) 
    for k, v in r.headers.getAllRawHeaders(): 
     #print "%s: %s" % (k, '\n '.join(v)) 
     output.write("%s: %s\n" % (k, '\n '.join(v))) 
    whenFinished = twisted.internet.defer.Deferred() 
    r.deliverBody(PrinterClient(whenFinished, output)) 
    return whenFinished 

def handleError(reason): 
    print reason 
    #reason.printTraceback() 
    #reactor.stop() 

def getPage(url, output): 
    print "Requesting %s" % (url,) 
    d = Agent(reactor).request('GET', 
         url, 
    Headers({'User-Agent': ['Mozilla/4.0 (Windows XP 5.1) Java/1.6.0_26']}), 
         None) 
    d._connectTimeout = 10 
    d.addCallback(handleResponse, output, url) 
    d.addErrback(handleError) 
    return d 

if __name__ == '__main__': 
    semaphore = twisted.internet.defer.DeferredSemaphore(500) 
    dl = list() 
    ipset = set() 
    queryset = set(['http://www.google.com','http://www.google1.com','http://www.google2.com', "up to 2500 sites"]) 
    filemap = {} 
    for q in queryset: 
     fpos = q.split('http://')[1].split(':')[0] 
     dl.append(semaphore.run(getPage, q, filemap[fpos])) 
    dl = twisted.internet.defer.DeferredList(dl) 
    dl.addCallbacks(lambda x: reactor.stop(), handleError) 
    reactor.run() 
    for k in filemap: 
     filemap[k].close() 

谢谢。 Jeppo

+0

代码太多,是否可以缩小到显示错误的最小工作示例? – 2013-02-20 17:01:48

+1

不仅代码太多,而且在语法上无效。对于初学者,修复缩进。 – Glyph 2013-02-20 17:57:53

回答

6

您的超时代码至少有两个问题。

首先,您设置的唯一超时时间为_connectTimeout,并且您将其设置为从Agent.request返回的Deferred。这是一个毫无意义的属性,并且Agent实现和Twisted的任何部分都不会尊重它。我想你的意思是在Agent实例上设置这个属性,而不是在那里会产生一些影响。但是,这是一个私人属性,不适合您直接与之交互。相反,您应该将connectTimeout=10传递给Agent初始值设定项。

其次,此超时只影响TCP连接建立超时。将其设置为10意味着如果无法在少于10秒内建立到特定URL的HTTP服务器的TCP连接,请求尝试将失败,并出现超时错误。如果连接在不到10秒的时间内成功建立,则超时没有其他意义。如果服务器需要10个小时向您发送回复,则Agent将坐在那里等待10个小时。你需要一个额外的超时,一个完整的请求超时。

这是分开使用reactor.callLater和可能Deferred.cancel实施。例如,

... 
d = agent.request(...) 
timeoutCall = reactor.callLater(60, d.cancel) 
def completed(passthrough): 
    if timeoutCall.active(): 
     timeoutCall.cancel() 
    return passthrough 
d.addBoth(completed) 
...