2015-04-02 152 views
2

更新:在dano的帮助下,我解决了这个问题。如何在多处理进程完成后退出python脚本?

我没有调用生产者join(),它使我的脚本挂起。 只需要添加一个行达诺说:

... 
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue)) 
producer.daemon = True 
producer.start() 
... 

旧脚本:

import multiprocessing 
import Queue 

QUEUE_SIZE = 2000 


def produce(file_queue, row_queue,): 

    while not file_queue.empty(): 
     src_file = file_queue.get() 
     zip_reader = gzip.open(src_file, 'rb') 

     try: 
      csv_reader = csv.reader(zip_reader, delimiter=SDP_DELIMITER) 

      for row in csv_reader: 
       new_row = process_sdp_row(row) 
       if new_row: 
        row_queue.put(new_row) 
     finally: 
      zip_reader.close() 


def consume(row_queue): 
    '''processes all rows, once queue is empty, break the infinit loop''' 
    while True: 
     try: 
      # takes a row from queue and process it 
      pass 
     except multiprocessing.TimeoutError as toe: 
      print "timeout, all rows have been processed, quit." 
      break 
     except Queue.Empty: 
      print "all rows have been processed, quit." 
      break 
     except Exception as e: 
      print "critical error" 
      print e 
      break 


def main(args): 

    file_queue = multiprocessing.Queue() 
    row_queue = multiprocessing.Queue(QUEUE_SIZE) 

    file_queue.put(file1) 
    file_queue.put(file2) 
    file_queue.put(file3) 

    # starts 3 producers 
    for i in xrange(4): 
     producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue)) 
     producer.start() 

    # starts 1 consumer 
    consumer = multiprocessing.Process(target=consume,args=(row_queue,)) 
    consumer.start() 

    # blocks main thread until consumer process finished 
    consumer.join() 

    # prints statistics results after consumer is done 

    sys.exit(0) 


if __name__ == "__main__": 
    main(sys.argv[1:]) 

目的

我使用python 2.7multiprocessing生成3个生产者读取3个文件的同时,然后将文件行放入row_queue中,并生成1位消费者对所有行执行更多处理。打印统计结果在消费者完成后在主线程中产生,所以我使用join()方法。最后调用sys.exit(0)来退出脚本。

问题: 无法退出脚本。

我试图用print "the end"代替sys.exit(0),在控制台上显示“结束”。难道我做错了什么?为什么脚本不会退出,以及如何让它退出?由于

+1

这里没有足够的信息可以肯定地说,但最可能的问题是一个或多个'Producer'进程仍在运行。尝试通过在调用'producer之前添加'producer.daemon = True'来使它们守护进程。start()',看脚本是否会退出。 – dano 2015-04-02 16:35:11

+0

或者,为每个生产者调用'join()'以及...假设它们通常会终止。 – twalberg 2015-04-02 16:40:13

+0

@twalberg如果它们最终结束,脚本将不需要调用'join()'而退出。当解释器退出时,Python隐式地在非守护进程子进程上调用join()。 – dano 2015-04-02 16:41:34

回答

0

producers没有multiprocessing.Process.daemon属性格式设置:

守护

进程的守护进程的标志,一个布尔值。这必须在start()被调用之前设置。

初始值从创建过程继承。

当进程退出时,它将尝试终止其所有守护进程的子进程。

请注意,不允许守护进程创建子进程。否则,守护进程会在其父进程退出时终止其子进程。另外,这些不是Unix守护进程或服务,它们是正常的进程,如果非守护进程退出,它们将被终止(而不是加入)。

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.daemon

只需添加producer.daemon = True

... 
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue)) 
producer.daemon = True 
producer.start() 
... 

这应该有可能使当consumer被加入整个程序结束。

顺便说一下,你应该也可能是join的制作者。

+0

如果生产者没有自己退出,调用'join'将会使程序挂起。他需要修复那些阻止他们退出以便调用'join'不会导致挂起的情况,在这种情况下,不需要首先使他们成为守护进程。 – dano 2015-04-02 16:57:07

+0

@dano是的,你是对的。但是由于我们不知道完整的“产品”功能是怎样的,我只是试图保持简单。我倾向于为我的'Processes'使用毒丸,然后加入它们,但我也将它们设置为'daemon = True',所以当父母在发出毒丸之前意外退出时,它们不会挂起。但随意downvote .. – 2015-04-02 17:01:08

+0

@dano @Jan Spurny嗨,大家好,我通过我的'produce()'方法,你能检查出来吗? – haifzhan 2015-04-02 17:08:40