2016-05-15 53 views
1

我对多处理相当陌生,而且我写了下面的脚本,但方法没有被调用。我不明白我错过了什么。Python中没有调用工作函数的多处理

我想要做的是:

  1. 呼叫的两个不同的异步方法。
  2. 先调用另一种方法。

    # import all necessary modules 
        import Queue 
        import logging 
        import multiprocessing 
        import time, sys 
        import signal 
    
        debug = True 
    
        def init_worker(): 
         signal.signal(signal.SIGINT, signal.SIG_IGN) 
    
        research_name_id = {} 
        ids = [55, 125, 428, 429, 430, 895, 572, 126, 833, 502, 404] 
        # declare all the static variables 
        num_threads = 2 # number of parallel threads 
    
        minDelay = 3 # minimum delay 
        maxDelay = 7 # maximum delay 
    
        # declare an empty queue which will hold the publication ids 
        queue = Queue.Queue(0) 
    
    
        proxies = [] 
        #print (proxies) 
    
        def split(a, n): 
         """Function to split data evenly among threads""" 
         k, m = len(a)/n, len(a) % n 
         return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] 
           for i in xrange(n)) 
        def run_worker(
          i, 
          data, 
          queue, 
          research_name_id, 
          proxies, 
          debug, 
          minDelay, 
          maxDelay): 
         """ Function to pull out all publication links from nist 
         data - research ids pulled using a different script 
         queue - add the publication urls to the list 
         research_name_id - dictionary with research id as key and name as value 
         proxies - scraped proxies 
         """ 
         print 'getLinks', i 
         for d in data: 
          print d 
          queue.put(d) 
    
    
    
    
        def fun_worker(i, queue, proxies, debug, minDelay, maxDelay): 
         print 'publicationData', i 
         try: 
          print queue.pop() 
         except: 
          pass 
    
    
    
    
        def main(): 
         print "Initializing workers" 
         pool = multiprocessing.Pool(num_threads, init_worker) 
         distributed_ids = list(split(list(ids), num_threads)) 
         for i in range(num_threads): 
          data_thread = distributed_ids[i] 
          print data_thread 
          pool.apply_async(run_worker, args=(i + 1, 
            data_thread, 
            queue, 
            research_name_id, 
            proxies, 
            debug, 
            minDelay, 
            maxDelay, 
           )) 
    
          pool.apply_async(fun_worker, 
           args=(
            i + 1, 
            queue, 
            proxies, 
            debug, 
            minDelay, 
            maxDelay, 
           )) 
    
         try: 
          print "Waiting 10 seconds" 
          time.sleep(10) 
    
         except KeyboardInterrupt: 
          print "Caught KeyboardInterrupt, terminating workers" 
          pool.terminate() 
          pool.join() 
    
         else: 
          print "Quitting normally" 
          pool.close() 
          pool.join() 
    
        if __name__ == "__main__": 
         main() 
    

,我得到的唯一输出是

 Initializing workers 
     [55, 125, 428, 429, 430, 895] 
     [572, 126, 833, 502, 404] 
     Waiting 10 seconds 
     Quitting normally 

回答

0

有几个问题:

  1. 您没有使用multiprocessing.Queue
  2. 如果你想通过apply_async等与子流程共享队列,您需要使用管理器(see example)

但是,您应该退后一步,问问自己您想做什么。 apply_async真的是要走的路吗?你有一个你想要重复映射的项目列表,应用一些计算密集型的长时间运行的转换(因为如果它们只是阻塞I/O,那么你也可以使用线程)。在我看来,这其实imap_unordered是你想要什么:

pool = multiprocessing.Pool(num_threads, init_worker) 
links = pool.imap_unordered(run_worker1, ids) 
output = pool.imap_unordered(fun_worker1, links) 

run_worker1fun_worker1需要进行修改,以取一个参数。如果您需要共享其他数据,那么您应该在初始化程序中传递它,而不是一次又一次地将它传递给子进程。

+0

感谢您的评论。我也想启动多个进程。 apply_async是正确的方法吗?我会更多地了解impa_unordered – nEO

相关问题