2014-03-26 129 views
1

我是一个gevent新手,但我觉得我的工作 - 在有限的意义上。基本上,对于1的池,代码继续进行,而对于较大的池,代码会卡住,通常在第一个池中(例如,有5个池,我看到3个greenlet整理,但不会更多)。出了什么问题?产卵?加入?gevent池卡住

我无法验证远程服务器是否得到由多个查询困惑,但它与串行请求的快速序列没有问题的,所以可能不是...

(我共享代码的全部,因为我不确定错误在哪里,谢谢你的支持。)

from urllib2 import urlopen 
from lxml.etree import parse 
import os, csv, cStringIO, codecs, pickle 
from selenium import webdriver 
from time import sleep 
import gevent 
from gevent import socket 
from gevent import monkey, pool 
# patches stdlib (including socket and ssl modules) to cooperate with other greenlets 
monkey.patch_all() 


class UnicodeWriter: 
    """ 
    A CSV writer which will write rows to CSV file "f", 
    which is encoded in the given encoding. 
    """ 

    def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): 
     # Redirect output to a queue 
     self.queue = cStringIO.StringIO() 
     self.writer = csv.writer(self.queue, dialect=dialect, **kwds) 
     self.stream = f 
     self.encoder = codecs.getincrementalencoder(encoding)() 

    def writerow(self, row): 
     self.writer.writerow([unicode(s).encode("utf-8") for s in row]) 
     # Fetch UTF-8 output from the queue ... 
     data = self.queue.getvalue() 
     data = data.decode("utf-8") 
     # ... and reencode it into the target encoding 
     data = self.encoder.encode(data) 
     # write to the target stream 
     self.stream.write(data) 
     # empty queue 
     self.queue.truncate(0) 

    def writerows(self, rows): 
     for row in rows: 
      self.writerow(row) 

os.chdir('/Users/laszlosandor/Downloads/kozbeszerzes') 

HOSTNAME = 'http://kozbeszerzes.ceu.hu' 

driver = webdriver.Chrome() 
results = set() 

for y in xrange(1998,2015): 
    for p in xrange(0,9999): 
     driver.get('http://kozbeszerzes.ceu.hu/searchresults.xhtml?q={}&page={}'.format(y,p)) 
     sleep(1) 
     if len(driver.find_elements_by_class_name('result'))==0: 
      break 
     for e in driver.find_elements_by_class_name('result'): 
      link = e.find_element_by_tag_name('a') 
      r = link.get_attribute('href').encode('ascii', 'ignore') 
      if r[:34]== 'http://kozbeszerzes.ceu.hu/tender/': 
       results.add(r) 
driver.quit() 

with open('list_of_urls', 'wb') as f: 
    pickle.dump(results, f) 
#with open('list_of_urls', 'r') as f: 
#  results = pickle.load(f) 

entities = set() 

header = ('TenderID','RequestorName','URL','Year','RequestorID','Subject','SourceURL','EstValue','Currency','DecisionDate','Value','VAT') 

# """Spawn multiple workers and wait for them to complete""" 
# # limit ourselves to max 10 simultaneous outstanding requests 
p = pool.Pool(10) 

f = open('tenders.csv', 'w') 
f.write(codecs.BOM_UTF8) 
writer = UnicodeWriter(f) 
writer.writerow(header) 

def workres(res): 
    try: 
     tender = parse(urlopen(res)).getroot() 
     print ('%s succeeded' % res) 
     for requestor in tender.findall('requestor'): 
      entities.add(HOSTNAME + requestor.get('url')) 
     id = tender.get('id') 
     reqname = tender.get('requestor') 
     url = tender.get('url') 
     year = tender.get('year') 
     reqid = tender.get('requestor_id') 
     subject = tender.get('subject') 
     source = tender.get('source_url') 
     estval = tender.get('estimated_value') 
     for part in tender.findall('./parts/part'): 
      winner = part.find('winner') 
      entities.add(HOSTNAME + winner.get('url')) 
      curr = part.find('currency').text 
      date = part.find('decisionDate').text 
      value = part.find('value').text 
      vat = part.find('vat').text 
      row = id, reqname, url, year, reqid, subject, source, estval, curr, date, value, vat 
      writer.writerow(row) 
    except socket.gaierror: 
     ex = sys.exc_info()[1] 
     print ('%s failed with %s' % (res, ex)) 

jobs = [p.spawn(workres, res) for res in results] 
p.join() 

f.close() 

with open('entities', 'wb') as f: 
    pickle.dump(entities, f) 

header = ['ID','URL','Name','NominalCity','City', 'ZIP', 'Address'] 

f = open('entities.csv', 'w') 
f.write(codecs.BOM_UTF8) 
writer = UnicodeWriter(f) 
writer.writerow(header) 

def workent(ent): 
    try: 
     ent = parse(urlopen(ent)).getroot() 
     print ('%s succeeded' % ent) 
     id = ent.get('id') 
     url = ent.get('url') 
     name = ent.get('name') 
     nominalcity = ent.get('city') 
     cities = ent.findall('./resolved_addresses/whitelistAddress/city') 
     zips = ent.findall('./resolved_addresses/whitelistAddress/postalCode') 
     streets = ent.findall('./resolved_addresses/whitelistAddress/street') 
     for a in xrange(0,len(cities)): 
      city = cities[a].text 
      zip = zips[a].text 
      street = streets[a].text 
      row = id, url, name, nominalcity, city, zip, street 
      writer.writerow(row) 
    except socket.gaierror: 
     ex = sys.exc_info()[1] 
     print ('%s failed with %s' % (ent, ex)) 

jobs = [p.spawn(workent, ent) for ent in entities] 
p.join() 

f.close() 

回答

0

我在这里看到很多错误。

  • 有不使用gevent.sleep()和不time.sleep是 阻塞。
  • 您的变量名称太短。您可以添加 描述代码的每个部分应该做什么。例如变量'p' 被使用两次..
  • 有多个urls获取使用urlopen和驱动模块?混乱..
  • 我会用不同的工人之间的队列,只有一个工人做 write_row电话,并与文件访问处理,现在你有多个绿色允许访问 同一个文件..
  • 使用更少的名单compehensions刚写出循环。

  • 我建议把'除了'workres'之外的尝试只在'解析(urlopen())' 代码可能有更多异常发生,你现在看不到。

more tips for gevent

+1

谢谢 - 我的代码可以肯定的是一个快速的黑客,但我还是不明白,为什么GEVENT池没有成功。写入文件的多个greenlet在网络上的许多例子中似乎都不是问题,我只是跟着他们。而睡眠是在代码的前半部分,以获得(刮)最终可以获得的绿色地址列表。所以阻塞时间不应该是'workent'功能的问题,不是?假设在上半场之后我已经在咸菜的实体列表中。为什么在一个gevent池中不能“运行”它们?谢谢! –