2012-05-21 60 views
0

我写了并重写了我的小python应用程序,使我的当前python技能不足。我从一个单线程应用程序开始,使用Beautiful Soup作为解析器,将其更改为lxml。使脚本多线程,我发现扭曲,但不能改变这个小片段扭曲。我只是在这里发布这个,所以也许你们可以指点我更好的方向,让这可能更快一点。要获取150k页,我需要在这一点上1小时。 Iam对这个原因感到满意,当我第一次尝试写它时,我慢了3倍。Datamining多线程与多重处理

#! /usr/bin/python 
# coding: ISO-8859-1 
import time, PySQLPool, Queue, threading 
from urllib3 import connection_from_url 
from lxml import etree 
import cStringIO as StringIO 

headers = { 
      'User-Agent'   : 'Mozilla/4.77 [en] (X11; I; IRIX;64 6.5 IP30)', 
      'Accept'    : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 
      'Accept-Language' : 'en-us;q=0.5,en;q=0.3', 
      'Accept-Encoding' : 'gzip, deflate', 
      'Accept-Charset'  : 'utf-8;q=0.7,*;q=0.7' 
} 

t = time.time() 
PySQLPool.getNewPool().maxActiveConnections = 60 
db = PySQLPool.getNewConnection(username='user', password='pass', host='127.0.0.1', db='fddb') 
pool = connection_from_url('http://fddb.info/', maxsize=60, timeout=150, headers=headers) 
detailCounter = 0 
urls = {} 
queue = Queue.Queue() 
out_queue = Queue.Queue() 

clean_rows = { 
       "Brennwert":"details_brennwert", 
       "Kalorien":"details_kalorien", 
       "Protein":"details_protein", 
       "Kohlenhydrate":"details_kohlenhydrate", 
       "davon Zucker":"details_zucker", 
       "davon Polyole":"details_polyole", 
       "Fett":"details_fett", 
       "Ballaststoffe":"details_ballaststoffe", 
       "Broteinheiten":"details_broteinheit", 
       "Alkohol":"details_alkohol", 
       "Cholesterin":"details_cholesterin", 
       "Koffein":"details_koffein", 
       "Wassergehalt":"details_wasser", 
       "Vitamin C":"details_vitc", 
       "Vitamin A":"details_vita", 
       "Vitamin D":"details_vitd", 
       "Vitamin E":"details_vite", 
       "Vitamin B1":"details_vitb1", 
       "Vitamin B2":"details_vitb2", 
       "Vitamin B6":"details_vitb6", 
       "Vitamin B12":"details_vitb12", 
       "Natrium":"details_natrium", 
       "Eisen":"details_eisen", 
       "Zink":"details_zink", 
       "Magnesium":"details_magnesium", 
       "Chlor":"details_chlor", 
       "Mangan":"details_mangan", 
       "Schwefel":"details_schwefel", 
       "Kalium":"details_kalium", 
       "Kalzium":"details_kalzium", 
       "Phosphor":"details_phosphor", 
       "Kupfer":"details_kupfer", 
       "Fluor":"details_fluor" 
       } 

def rows_escape(text): 
    for item, key in clean_rows.items(): 
     text = text.replace(item, key) 
    text = text.rstrip() 
    return text 

clean_values = { 
     "kJ" :"", 
     "kcal" :"", 
     "g" :"", 
     "mg" :"", 
     "%" :"", 
     "," :".", 
     u"\u03bc": "" 
     } 

def values_escape(text): 
    for item, key in clean_values.items(): 
     text = text.replace(item, key) 
    text = text.rstrip() 
    return text 

def insertDetails(container, foods_id): 
    c = PySQLPool.getNewQuery(db) 
    query_rows = '' 
    query_values = '' 
    for item in container: 
     query_rows += item['row'] + ',' 
     query_values += item['value'] + ',' 

    c.Query("INSERT INTO details (%sdetails_id,foods_id) VALUES (%sNULL,%s)" % (query_rows, query_values, foods_id)) 
    c.Query("UPDATE foods SET foods_check = '1' WHERE foods_id=%d" % (foods_id)) 

def getHP(url): 
    r = pool.request('GET', '/' + url) 
    return r.data 

class ThreadUrl(threading.Thread): 
    def __init__(self, queue, out_queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 
     self.out_queue = out_queue 
    def run(self): 
     while True: 
      host = self.queue.get() 
      data = getHP(host[0]) 
      self.out_queue.put([data, host[1]]) 
      self.queue.task_done() 

class DatamineThread(threading.Thread): 
    def __init__(self, out_queue): 
     threading.Thread.__init__(self) 
     self.out_queue = out_queue 
    def run(self): 
     while True: 
      global detailCounter 

      qData = self.out_queue.get() 
      data = qData[0] 
      foods_id = qData[1] 

      container = [] 
      parser = etree.HTMLParser(encoding='cp1252') 
      tree = etree.parse(StringIO.StringIO(data), parser) 
      divx = tree.xpath('//div[@style="background-color:#f0f5f9;padding:2px 4px;" or @style="padding:2px 4px;"]') 

      for xdiv in divx: 
       x = etree.ElementTree(element=xdiv, parser=parser) 

       value = x.xpath('string(//div/text())') 
       label = x.xpath('string(//*[self::a or self::span]/text())') 

       label = rows_escape(label) 

       if not "[nodata]" in value: 
        if u"\u03bc" in value: 
         value = values_escape(value) 
         item4 = 0 
         item4 = float(value) 
         item4 = item4/1000 
         container.append({'row':label,'value':str(item4)}) 
        else: 
         container.append({'row':label,'value':values_escape(value)}) 

      detailCounter += 1 
      container = tuple(container) 
      insertDetails(container, foods_id) 

      self.out_queue.task_done() 

def main(): 

    c = PySQLPool.getNewQuery(db) 
    c.Query("SELECT foods_id, foods_url FROM foods WHERE foods_check = 0") 
    urls = c.record 

    for i in range(6): 
     t = ThreadUrl(queue, out_queue) 
     t.setDaemon(True) 
     t.start() 

    for item in urls: 
     queue.put([item['foods_url'], item['foods_id']]) 

    for i in range(6): 
     dt = DatamineThread(out_queue) 
     dt.setDaemon(True) 
     dt.start() 

    queue.join() 
    out_queue.join() 

main() 
db.close 
print "Zeit: %.2f New Details: %d" % (time.time()-t, detailCounter) 
+0

在您盲目尝试优化之前,先对您的应用程序执行dbfetch/memory/cputime分析并确定真正的瓶颈。 – moooeeeep

回答

1

我建议你使用多处理器模块,如果你有多个CPU,如果你的程序似乎是非常CPU密集型。由于全局解释器锁(Global Interpreter Lock)或GIL,Python在多线程方面出了名,这在很大程度上确保了在任何给定的时间内,在一个进程中只能有一个python执行线程。

+0

嗯,我试过,多处理和扭曲。但不知何故,他们在哪里放慢了我的剪辑,我发布在那里。 – Cango

+0

嗯还有:bc url抓取或者cpu将是你的瓶颈我建议你做2 *或者3 *的进程核心数量,在循环中首先会获取一个url然后处理它。任何额外的同步肯定会杀死你。 –