2010-09-06 67 views
1

我写了一个简单的webpool使用线程池。问题是:然后爬虫遍布整个网站它必须完成,但实际上它最终等待一些东西,并且脚本没有完成,为什么会发生这种情况?python threadpool问题(等待什么)

from Queue import Queue 
from threading import Thread 

import sys 
from urllib import urlopen 
from BeautifulSoup import BeautifulSoup, SoupStrainer 
import re 
from Queue import Queue, Empty 
from threading import Thread 

visited = set() 
queue = Queue() 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      print "startcall in thread",self 
      print args 
      try: func(*args, **kargs) 
      except Exception, e: print e 
      print "stopcall in thread",self 
      self.tasks.task_done() 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): Worker(self.tasks) 

    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 


def process(pool,host,url): 

    try: 
     print "get url",url 
     #content = urlopen(url).read().decode(charset) 
     content = urlopen(url).read() 
    except UnicodeDecodeError: 
     return 

    for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): 
     #print "link",link 
     try: 
      href = link['href'] 
     except KeyError: 
      continue 


     if not href.startswith('http://'): 
      href = 'http://%s%s' % (host, href) 
     if not href.startswith('http://%s%s' % (host, '/')): 
      continue 



     if href not in visited: 
      visited.add(href) 
      pool.add_task(process,pool,host,href) 
      print href 




def start(host,charset): 

    pool = ThreadPool(7) 
    pool.add_task(process,pool,host,'http://%s/' % (host)) 
    pool.wait_completion() 

start('simplesite.com','utf8') 

回答

1

我看到的问题是,你永远在运行退出。所以,永远会阻止。工作完成后,你需要打破这个循环。

你可以尝试:
1)后task.get(...)运行插入

if not func: break 

2)追加

pool.add_task(None, None, None) 

过程的结束。

这是进程通知他没有更多任务要处理的方法。

+0

thanx,寻求帮助。我最终解决它,如果self.tasks.qsize()== 0: 中断 – Evg 2010-09-06 12:12:08

+0

@Evg:小心,“任务队列是空的”是不一样的“没有更多的工作要做”... – dugres 2010-09-06 12:43:32

+0

是的thnx再次)我明白这一点,这是一个问题。在你的情况下“在流程结束时”。我必须检查空队列,如果它为空,请执行pool.add_task(None,None,None)。没有“停止任务”的想法不会让我活下去,我认为exsist的标志是 - 所有工人都有等待状态(在func,args,kargs = self.tasks.get()之前的行) )。如果发生这种情况,我可以打破所有工人的所有循环,你对此有何看法? – Evg 2010-09-06 15:52:10