2017-01-23 107 views
0

我有一个长期运行的计算模型,我希望控制,将数据输入到数据并使用STDIN和读取数据。在这个外部代码里面,有一个控制反馈循环需要从STDIN每100ms左右的新数据。写入到标准输入并从标准输出中读取长时间运行的子进程中的python

因此,subprocess.communicate()是不合适的,因为它等待过程完成。该过程的估计运行时间为数周。

下面的代码不起作用,因为控制挂在stdout.read()并永不回来。

通过STDIN和STDOUT讨论的正确方法是什么?

import subprocess as sb 


class Foo: 
    def process_output(self,values: str) ->(): 
     """ gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """ 
     floats = [float(f) for f in values.split(',') if values and f] 
     # if len(floats) == 7: 
     mag = (floats[0], floats[1], floats[2]) 
     quat = (floats[3], floats[4], floats[5], floats[6]) 
     return (mag, quat) 

    def format_input(self,invals: {}) -> bytes: 
     """ takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline""" 
     concat = lambda s, f: ''.join([f % x for x in s]) 
     retval = '' 
     retval += concat(invals['mag_meas'], '%3.2f,') 
     retval += concat(invals['euler_angle'], '%3.2f,') 
     retval += concat(invals['sun_meas'], '%3.2f,') 
     retval += concat(invals['epoch'], '%02.0f,') 
     retval += concat(invals['lla'], '%3.2f,') 
     retval += concat([invals['s_flag']], '%1.0f,') 
     retval = retval[:-1] 
     retval += '\n' 
     return retval.encode('utf-8') 

    def page(self,input: {}) ->(): 
     """ send a bytestring with 19 floats to ADC.elf. Process the returned 7 floats into a data struture""" 
     formatted = self.format_input(input) 
     self.pid.stdin.write(formatted) 
     response = self.pid.stdout.read() 

     return self.process_output(response.decode()) 

    def __init__(self): 
     """ start the long-running process ADC.elf that waits for input and performs some scientific computation""" 
     self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE) 

    def exit(self): 
     """ send SIGTERM to ADC.elf""" 
     self.pid.terminate() 



if __name__ == "__main__": 
    # some dummy data 
    testData = {'mag_meas': [1, 2, 3], 
       'euler_angle': [4, 5, 6], 
       'sun_meas': [7, 8, 9], 
       'epoch': [0, 1, 2, 3, 4, 5], 
       'lla': [6, 7, 8], 
       's_flag': 9 
       } 
    #initialize 
    runner = Foo() 
    # send and receive once. 
    result = runner.page(testData) 
    print(result) 
    #clean up 
    runner.exit() 
+0

['沟通()'](https://github.com/python/cpython /blob/eb70a87363193a22a2ee36a44efd8372f97efeae/Lib/subprocess.py#L1051)本身使用线程。 ['select'](https://docs.python.org/2/library/select.html)也是一个选项。正如非阻塞IO。 – dhke

+0

@dhke这是真的,但沟通()将不会返回,直到过程完成;我想在它的单个执行过程中多次查询它。 – gvoysey

+0

因此,我的建议是:看看如何沟通,并适应您的需求。 'communic()'显式等待读者线程。放下那个,你有连续的投票。 – dhke

回答

0

不知道如何与子直接做到这一点,但pexpect做完全正确的事情:

import pexpect, os 
from time import sleep 

class Foo: 
    def process_output(self,values: str) ->(): 
     floats = [float(f) for f in values.split(',') if values and f] 
     # if len(floats) == 7: 
     mag = (floats[0], floats[1], floats[2]) 
     quat = (floats[3], floats[4], floats[5], floats[6]) 
     return (mag, quat) 

    def format_input(self,invals: {}) -> bytes: 
     concat = lambda s, f: ''.join([f % x for x in s]) 
     retval = '' 
     retval += concat(invals['mag_meas'], '%3.2f,') 
     retval += concat(invals['euler_angle'], '%3.2f,') 
     retval += concat(invals['sun_meas'], '%3.2f,') 
     retval += concat(invals['epoch'], '%02.0f,') 
     retval += concat(invals['lla'], '%3.2f,') 
     retval += concat([invals['s_flag']], '%1.0f,') 
     retval = retval[:-1] 
     retval += '\n' 
     return retval.encode('utf-8') 

    def page(self,input: {}) ->(): 
     formatted = self.format_input(input) 
     self.pid.write(formatted) 
     response = self.pid.readline() 

     return self.process_output(response.decode()) 

    def __init__(self): 

     self.pid = pexpect.spawn('./ADC.elf') 
     self.pid.setecho(False) 

    def exit(self): 
     self.pid.terminate() 



if __name__ == "__main__": 
    testData = {'mag_meas': [1, 2, 3], 
       'euler_angle': [4, 5, 6], 
       'sun_meas': [7, 8, 9], 
       'epoch': [0, 1, 2, 3, 4, 5], 
       'lla': [6, 7, 8], 
       's_flag': 9 
       } 
    runner = Foo() 
    i = 0 
    while i < 100: 
     result = runner.page(testData) 
     print(result) 
     i += 1 
     sleep(.1) 



    runner.exit() 
相关问题