2011-05-14 95 views
1

我在Jython中有一个函数,这个函数使用Popen来运行另一个程序,它将xml文件写入到它的stdout中,该文件指向一个文件。当过程完成后,我关闭文件并调用另一个函数来解析它。我在解析过程中遇到了一堆错误消息,指向访问已关闭的文件和/或格式不正确的xml文件(在我看着它们时显示正常)。我认为output.close()可能会在关闭文件之前返回,所以我添加了一个等待output.closed为true的循环。这似乎起初工作,但后来我的程序打印以下nio解析xml文件时出错

blasting 
blasted 
parsing 
parsed 
    Extending genes found via genemark, 10.00% done 
blasting 
blasted 
parsing 
Exception in thread "_CouplerThread-7 (stdout)" Traceback (most recent call last): 
    File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run 
    self.write_func(buf) 
IOError: java.nio.channels.AsynchronousCloseException 
[Fatal Error] 17_2_corr.blastp.xml:15902:63: XML document structures must start and end within the same entity. 
Retry 
blasting 
blasted 
parsing 
Exception in thread "_CouplerThread-9 (stdout)" Traceback (most recent call last): 
    File "/Users/mbsulli/jython/Lib/subprocess.py", line 675, in run 
    self.write_func(buf) 
IOError: java.nio.channels.ClosedChannelException 
[Fatal Error] 17_2_corr.blastp.xml:15890:30: XML document structures must start and end within the same entity. 
Retry 
blasting 

我不知道我的选择是从这里。我是否正确地认为在我解析它之前没有写入xml?如果是的话,我可以确保它是谁。

def parseBlast(fileName): 
    """ 
    A function for parsing XML blast output. 
    """ 
    print "parsing" 
    reader = XMLReaderFactory.createXMLReader() 
    reader.entityResolver = reader.contentHandler = BlastHandler() 
    reader.parse(fileName) 
    print "parsed" 

    return dict(map(lambda iteration: (iteration.query, iteration), reader.getContentHandler().iterations)) 

def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False): 
    """ 
    Performs a blast search using the blastp executable and database in blastLocation on 
    the query with the eValue. The result is an XML file saved to fileName. If fileName 
    already exists the search is skipped. If remote is true then the search is done remotely. 
    """ 
    if not os.path.isfile(fileName) or force: 
    output = open(fileName, "w") 
    command = [blastLocation + "/bin/blastp", 
       "-evalue", str(eValue), 
       "-outfmt", "5", 
       "-query", query] 
    if remote: 
     command += ["-remote", 
        "-db", database] 
    else: 
     command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()), 
        "-db", database] 
    print "blasting" 
    blastProcess = subprocess.Popen(command, 
             stdout = output) 
    while blastProcess.poll() == None: 
     if pipeline.exception: 
     print "Stopping in blast" 
     blastProcess.kill() 
     output.close() 
     raise pipeline.exception 
    output.close() 
    while not output.closed: 
     pass 
    print "blasted" 
    try: 
    return parseBlast(fileName) 
    except SAXParseException: 
    print 'Retry' 
    return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True) 

回答

1

我觉得这个问题开始时,我从调用子进程等待使用轮询方法,所以我可以运行时对其停止进程切换。因为我已经有了许多与之合作的数据集的结果,所以我不得不再次启动子进程,所以很难说清楚。无论如何,我的猜测是,当我关闭它时输出仍然被写入,我的解决方案是切换到管道并自己写文件。

def cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote = False, force = False): 


""" 
Performs a blast search using the blastp executable and database in blastLocation on 
the query with the eValue. The result is an XML file saved to fileName. If fileName 
already exists the search is skipped. If remote is true then the search is done remotely. 
""" 
    if not os.path.isfile(fileName) or force: 
    output = open(fileName, "w") 
    command = [blastLocation + "/bin/blastp", 
       "-evalue", str(eValue), 
       "-outfmt", "5", 
       "-query", query] 
    if remote: 
     command += ["-remote", 
        "-db", database] 
    else: 
     command += ["-num_threads", str(Runtime.getRuntime().availableProcessors()), 
        "-db", database] 
    blastProcess = subprocess.Popen(command, 
            stdout = subprocess.PIPE) 
    while blastProcess.poll() == None: 
     output.write(blastProcess.stdout.read()) 
     if pipeline.exception: 
     psProcess = subprocess.Popen(["ps", "aux"], stdout = subprocess.PIPE) 
     awkProcess = subprocess.Popen(["awk", "/" + " ".join(command).replace("/", "\\/") + "/"], stdin = psProcess.stdout, stdout = subprocess.PIPE) 
     for line in awkProcess.stdout: 
      subprocess.Popen(["kill", "-9", re.split(r"\s+", line)[1]]) 
     output.close() 
     raise pipeline.exception 
    remaining = blastProcess.stdout.read() 
    while remaining: 
     output.write(remaining) 
     remaining = blastProcess.stdout.read() 

    output.close() 

    try: 
    return parseBlast(fileName) 
    except SAXParseException: 
    return cachedBlast(fileName, blastLocation, database, eValue, query, pipeline, remote, True)