2013-11-24 97 views
1

我具有以下片的内部运行的代码thread..where“扩大” C可执行产生用于每个输入“URL”唯一字符串输出:Python的子流程-POPEN死锁在多线程环境中

p = Popen(["expand", url], bufsize=65536, stdout=PIPE, stderr=PIPE, close_fds=True) 
output,error = p.communicate() 
print output 

我已经实现了一个基于Queue的多线程解决方案,它可以处理一批100个URL中的5000个URL。

当我运行脚本时;它挂..和ps -aef显示,2个进程仍在运行:

1. 10177  5721 6662 6 09:25 pts/15 00:04:36 python expandPlaylist.py -s -t 
2. 10177 11004 5721 0 09:26 pts/15 00:00:00 expand http://www.sample.com 

主python脚本堆栈跟踪:螺纹

# ThreadID: 140332211570432 
File: "expandPlaylist.py", line 902, in <module> 
Main() 
File: "expandPlaylist.py", line 894, in Main 
startmain(db, c, conf) 
File: "expandPlaylist.py", line 834, in startmain 
stream_queue.join() 
File: "/usr/lib64/python2.7/Queue.py", line 82, in join 
self.all_tasks_done.wait() 
File: "/usr/lib64/python2.7/threading.py", line 238, in wait 
waiter.acquire() 

堆栈跟踪它得到了僵持

# ThreadID: 140332016596736 
File: "/usr/lib64/python2.7/threading.py", line 503, in __bootstrap 
self.__bootstrap_inner() 
File: "/usr/lib64/python2.7/threading.py", line 530, in __bootstrap_inner 
self.run() 
File: "expandPlaylist.py", line 120, in run 
self.process.wait() 
File: "/usr/lib64/python2.7/subprocess.py", line 1242, in wait 
pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0) 
File: "/usr/lib64/python2.7/subprocess.py", line 471, in _eintr_retry_call 
return func(*args) 

GDB process_id的详细资料:11004

(gdb) bt 
#0 __lll_lock_wait() at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:136 
#1 0x00007fc36bd33294 in _L_lock_999() from /lib64/libpthread.so.0 
#2 0x00007fc36bd330aa in __pthread_mutex_lock (mutex=0x6a8c20) at pthread_mutex_lock.c:61 
#3 0x00007fc36c204dcd in g_mutex_lock (mutex=0x6a8c50) at gthread-posix.c:213 
#4 0x00007fc36c1b11df in g_source_unref_internal (source=0x844f90, context=0x6a8c50, have_lock=0) at gmain.c:1975 
#5 0x00007fc36c1b13e3 in g_source_unref (source=0x844f90) at gmain.c:2044 
#6 0x00007fc36cb475a9 in soup_session_dispose (object=0x61e100) at soup-session.c:305 
#7 0x00007fc36c4d270e in g_object_unref (_object=0x61e100) at gobject.c:3160 
#8 0x000000000040584b in dispose_session (parser=0x618020) at al_playlist_parser.c:859 
#9 0x0000000000403b0b in al_playlist_parser_dispose (obj=0x618020) at al_playlist_parser.c:129 
#10 0x00007fc36c4d270e in g_object_unref (_object=0x618020) at gobject.c:3160 
#11 0x0000000000403315 in main (argc=1, argv=0x7fff462cdca8) at al_expand.c:143 
  1. 我该如何避免死锁?
  2. 否则是否有任何方法来将超时绑定到self.process.wait()并终止该线程,如果子进程花费的时间过长?
+0

如果您希望我们帮助您查找错误,您必须发布您的“基于队列的多线程解决方案”。 – Max

+0

1.用['subprocess32'](https://pypi.python.org/pypi/subprocess32)试试它是否是由'subprocess'模块中已经修复的错误引起的。 2. [停止读取没有挂起的Python输出过程?](http://stackoverflow.com/q/4417962/4279) – jfs

+0

你能发布一个最小的完整的代码示例来显示问题吗?更好的,如果它是基于[我的回答你的上一个问题]代码(http://stackoverflow.com/a/20064095/4279) – jfs

回答

0

如果你只需要调用的参数列表上的子进程,我倾向于做这样的事情:

#!/usr/bin/env python3 
# -*- coding: utf-8 -*- 
# 
# Author: R.F. Smith <[email protected]> 
# $Date: 2013-11-24 11:06:39 +0100 $ 
# 
# To the extent possible under law, Roland Smith has waived all copyright and 
# related or neighboring rights to vid2mp4.py. This work is published from the 
# Netherlands. See http://creativecommons.org/publicdomain/zero/1.0/ 

"""Convert all video files given on the command line to H.264/AAC streams in 
an MP4 container.""" 

from __future__ import print_function, division # for compatibility with Python 2. 

__version__ = '$Revision: cac4808 $'[11:-2] 

import os 
import sys 
import subprocess 
from multiprocessing import cpu_count 
from time import sleep 


def warn(s): 
    """Print a warning message. 

    :param s: Message string 
    """ 
    s = ' '.join(['Warning:', s]) 
    print(s, file=sys.stderr) 


def checkfor(args, rv=0): 
    """Make sure that a program necessary for using this script is 
    available. 

    :param args: String or list of strings of commands. A single string may 
    not contain spaces. 
    :param rv: Expected return value from evoking the command. 
    """ 
    if isinstance(args, str): 
     if ' ' in args: 
      raise ValueError('no spaces in single command allowed') 
     args = [args] 
    try: 
     with open(os.devnull, 'w') as bb: 
      rc = subprocess.call(args, stdout=bb, stderr=bb) 
     if rc != rv: 
      raise OSError 
    except OSError as oops: 
     outs = "Required program '{}' not found: {}." 
     print(outs.format(args[0], oops.strerror)) 
     sys.exit(1) 


def startencoder(fname): 
    """Use ffmpeg to convert a video file to H.264/AAC 
    streams in an MP4 container. 

    :param fname: Name of the file to convert. 
    :returns: a 3-tuple of a Process, input path and output path 
    """ 
    basename, ext = os.path.splitext(fname) 
    known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv'] 
    if ext.lower() not in known: 
     warn("File {} has unknown extension, ignoring it.".format(fname)) 
     return (None, fname, None) 
    ofn = basename + '.mp4' 
    args = ['ffmpeg', '-i', fname, '-c:v', 'libx264', '-crf', '29', '-flags', 
      '+aic+mv4', '-c:a', 'libfaac', '-sn', ofn] 
    with open(os.devnull, 'w') as bitbucket: 
     try: 
      p = subprocess.Popen(args, stdout=bitbucket, stderr=bitbucket) 
      print("Conversion of {} to {} started.".format(fname, ofn)) 
     except: 
      warn("Starting conversion of {} failed.".format(fname)) 
    return (p, fname, ofn) 


def manageprocs(proclist): 
    """Check a list of subprocesses tuples for processes that have ended and 
    remove them from the list. 

    :param proclist: a list of (process, input filename, output filename) 
    tuples. 
    """ 
    print('# of conversions running: {}\r'.format(len(proclist)), end='') 
    sys.stdout.flush() 
    for p in proclist: 
     pr, ifn, ofn = p 
     if pr is None: 
      proclist.remove(p) 
     elif pr.poll() is not None: 
      print('Conversion of {} to {} finished.'.format(ifn, ofn)) 
      proclist.remove(p) 
    sleep(0.5) 


def main(argv): 
    """Main program. 

    :param argv: command line arguments 
    """ 
    if len(argv) == 1: 
     binary = os.path.basename(argv[0]) 
     print("{} version {}".format(binary, __version__), file=sys.stderr) 
     print("Usage: {} [file ...]".format(binary), file=sys.stderr) 
     sys.exit(0) 
    checkfor(['ffmpeg', '-version']) 
    avis = argv[1:] 
    procs = [] 
    maxprocs = cpu_count() 
    for ifile in avis: 
     while len(procs) == maxprocs: 
      manageprocs(procs) 
     procs.append(startencoder(ifile)) 
    while len(procs) > 0: 
     manageprocs(procs) 


if __name__ == '__main__': 
    main(sys.argv) 

如果挂的过程是一个问题,你可以适应manageprocs杀子过程在一定的时间之后。

+0

我尝试过使用你的代码中的代码片断......但它并没有帮助我的情况..因为'process.poll()'总是返回'None'为少数(悬挂)线程..反正..曼尼thx! –