2011-05-06 79 views
12

我很努力去理解什么是解决这个简单问题的Python方法。使用子进程管道大量的数据到标准输入.Popen

我的问题很简单。如果你使用下面的代码,它会挂起。这在子流程模块文档中有详细记录。

import subprocess 

proc = subprocess.Popen(['cat','-'], 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         ) 
for i in range(100000): 
    proc.stdin.write('%d\n' % i) 
output = proc.communicate()[0] 
print output 

寻找解决的办法(有一个非常有见地的线程,但我现在已经失去了它),我发现这个解决方案(等等),它使用一个明确的叉:

import os 
import sys 
from subprocess import Popen, PIPE 

def produce(to_sed): 
    for i in range(100000): 
     to_sed.write("%d\n" % i) 
     to_sed.flush() 
    #this would happen implicitly, anyway, but is here for the example 
    to_sed.close() 

def consume(from_sed): 
    while 1: 
     res = from_sed.readline() 
     if not res: 
      sys.exit(0) 
      #sys.exit(proc.poll()) 
     print 'received: ', [res] 

def main(): 
    proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE) 
    to_sed = proc.stdin 
    from_sed = proc.stdout 

    pid = os.fork() 
    if pid == 0 : 
     from_sed.close() 
     produce(to_sed) 
     return 
    else : 
     to_sed.close() 
     consume(from_sed) 

if __name__ == '__main__': 
    main() 

这枚解决方案在概念上非常容易理解,它使用了一个更多的进程,并且与子进程模块相比,其级别太低(这只是为了隐藏这种东西......)。

我想知道:是否有一个简单和干净的解决方案,使用不会挂起或执行此模式的子进程模块我必须退后一步,实现旧式选择循环或显式的叉?

谢谢

+1

您可以使用线程而不是叉(与非UNIX更好的兼容性,可以说更具可读性),但除此之外,我认为您给出的示例很好。选择循环可能也会在一个线程中“复用”操作,但它不会比这更简单。 – wump 2011-05-06 12:59:47

+0

使用'Popen.wait()'初步阻塞应该会造成死锁(并挂起),但我使用了'Popen.communicate()'来摆脱这种情况。我认为它使用了一些内部轮询循环来将数据填充到缓冲区中。当你尝试它时是否真的挂起,还是它需要很长时间才能运行? – 2011-05-06 13:05:22

+0

uhmmm ...由于子流程模块是对低级流程管理的抽象,所以我很惊讶它不包含这个简单的用户案例。 – 2011-05-06 14:13:36

回答

8

如果您想要一个纯粹的Python解决方案,您需要将阅读器或书写器放在单独的线程中。 threading包是一个轻量级的方法,可以方便地访问常见对象并且不会出现乱码。

import subprocess 
import threading 
import sys 

proc = subprocess.Popen(['cat','-'], 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         ) 
def writer(): 
    for i in range(100000): 
     proc.stdin.write('%d\n' % i) 
    proc.stdin.close() 
thread = threading.Thread(target=writer) 
thread.start() 
for line in proc.stdout: 
    sys.stdout.write(line) 
thread.join() 
proc.wait() 

这可能是整齐地看到了subprocess模块现代化,以支持流和协同程序,这将允许混合的Python件和外壳件要更优雅建造管道。

+0

以防万一,这并非完全明显:如果你不需要Python中的输出;放下'stdout = PIPE',你不需要单独的线程 - 你可以在同一个线程中写入'proc.stdin'。不相关:即使在写入时发生异常,也可以使用'with proc.stdin'来关闭它。 – jfs 2016-06-10 16:40:11

1

对于这种事情,shell比子进程工作得更好。

编写非常简单的Python应用程序,它从sys.stdin读取并写入sys.stdout

使用shell管道将简单应用程序连接在一起。

如果需要,请使用subprocess启动管道,或者直接编写一行shell脚本。

python part1.py | python part2.py 

这非常非常有效。只要你保持它非常简单,它也可以移植到所有Linux(和Windows)上。

+0

我知道有这样做的1001方法。我要求的蟒蛇方式:)打电话给我一个纯粹主义者:) – 2011-05-06 12:35:05

+0

@ user741720:我给你Pythonic解决方案。使用'sys.stdin'和'sys.stdout',避免不必要的复杂的'子流程'代码。纯粹的方法是尽可能少编写代码,并尽可能干净地编写一点代码。如果不在已经高度优化的操作系统代码的中间插入额外的Python处理,操作系统可以做到最好(也是最快和最少的开销)。 – 2011-05-06 12:38:15

0

下面是一个例子的使用管从gzip的一次读取一个记录(Python 3中):

cmd = 'gzip -dc compressed_file.gz' 
pipe = Popen(cmd, stdout=PIPE).stdout 

for line in pipe: 
    print(":", line.decode(), end="") 

我知道存在对于标准模块,它只是意味着作为示例。您可以使用沟通方法一次性读取整个输出结果(如shell back-ticks),但显然您必须小心内存大小。

这里是写记录到LP(1)程序的Linux中的示例(再次的Python 3):

cmd = 'lp -' 
proc = Popen(cmd, stdin=PIPE) 
proc.communicate(some_data.encode()) 
+0

这是您随处可见的标准示例。重点是我不希望输入来自另一个进程,我希望避免在将所有输入发送给消费者之前将所有输入写入内存中......将所有内容全部传递给proc.communicate,当然可以解决问题... – 2011-05-06 14:08:37

0

现在我知道这是不会完全满足你的纯粹,作为输入必须适合内存,并且你没有选择与输入输出交互工作,但至少这对你的例子工作正常。沟通方法可以选择将输入作为参数,如果以这种方式为进程提供输入,它将起作用。

import subprocess 

proc = subprocess.Popen(['cat','-'], 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         ) 

input = "".join('{0:d}\n'.format(i) for i in range(100000)) 
output = proc.communicate(input)[0] 
print output 

至于更大的问题,你也可以继承POPEN,改写__init__接受类似于流的对象作为参数传递给标准输入,标准输出,标准错误,并重写_communicate方法(多毛的跨平台,你需要做的两次,请参阅subprocess.py源文件)调用stdin流的read(),并将输出写入write()输出到stdout和stderr流。这个方法让我困扰的是,据我所知,它还没有完成。当以前没有做过明显的事情时,通常有一个原因(它不能按预期工作),但我不明白为什么它不应该,除了需要在Windows中使用线程安全的事实。

4

如果你不想保留所有的数据在内存中,你必须使用select。例如。像这样:

import subprocess 
from select import select 
import os 

proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

i = 0; 
while True: 
    rlist, wlist, xlist = [proc.stdout], [], [] 
    if i < 100000: 
     wlist.append(proc.stdin) 
    rlist, wlist, xlist = select(rlist, wlist, xlist) 
    if proc.stdout in rlist: 
     out = os.read(proc.stdout.fileno(), 10) 
     print out, 
     if not out: 
      break 
    if proc.stdin in wlist: 
     proc.stdin.write('%d\n' % i) 
     i += 1 
     if i >= 100000: 
      proc.stdin.close() 
+0

是的,这将是概念上正确的解决方案。也许有点复杂,但是如果Popen没有实现这些开箱即用的模式,这就是我实现它的方式...... – 2011-05-06 15:51:09

+2

我不认为它实现了开箱即用,因为通常当你需要求助于这个,你还需要对poll/select循环进行精细控制。你有没有检查['asyncore'](http://docs.python.org/library/asyncore.html)模块? – 2011-05-06 16:14:00

+2

我发现了这个有趣的博客文章:http://dcreager.net/2009/08/13/subprocess-callbacks/ – 2011-05-06 16:24:20

2

这里是我用来加载6G的mysql转储文件通过子进程加载的东西。远离shell = True。不安全并从流程浪费资源开始。

import subprocess 

fhandle = None 

cmd = [mysql_path, 
     "-u", mysql_user, "-p" + mysql_pass], 
     "-h", host, database] 

fhandle = open(dump_file, 'r') 
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

(stdout,stderr) = p.communicate() 

fhandle.close() 
0

在Python 3.5使用aiofiles & ASYNCIO:

有点复杂,但你只需要1024字节的内存来写标准输入!

import asyncio 
import aiofiles 
import sys 
from os.path import dirname, join, abspath 
import subprocess as sb 


THIS_DIR = abspath(dirname(__file__)) 
SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4') 
DEST_PATH = '/home/vahid/Desktop/sample.mp4' 


async def async_file_reader(f, buffer): 
    async for l in f: 
     if l: 
      buffer.append(l) 
     else: 
      break 
    print('reader done') 


async def async_file_writer(source_file, target_file): 
    length = 0 
    while True: 
     input_chunk = await source_file.read(1024) 
     if input_chunk: 
      length += len(input_chunk) 
      target_file.write(input_chunk) 
      await target_file.drain() 
     else: 
      target_file.write_eof() 
      break 

    print('writer done: %s' % length) 


async def main(): 
    dir_name = dirname(DEST_PATH) 
    remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH) 

    stdout, stderr = [], [] 
    async with aiofiles.open(SAMPLE_FILE, mode='rb') as f: 
     cmd = await asyncio.create_subprocess_shell(
      remote_cmd, 
      stdin=sb.PIPE, 
      stdout=sb.PIPE, 
      stderr=sb.PIPE, 
     ) 

     await asyncio.gather(*(
      async_file_reader(cmd.stdout, stdout), 
      async_file_reader(cmd.stderr, stderr), 
      async_file_writer(f, cmd.stdin) 
     )) 

     print('EXIT STATUS: %s' % await cmd.wait()) 

    stdout, stderr = '\n'.join(stdout), '\n'.join(stderr) 

    if stdout: 
     print(stdout) 

    if stderr: 
     print(stderr, file=sys.stderr) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

结果:

writer done: 383631 
reader done 
reader done 
EXIT STATUS: 0 
1

您的代码死锁只要cat的标准输出OS管道缓冲区已满。如果您使用stdout=PIPE;你必须及时使用它,否则可能会发生僵局。

如果您在进程运行时不需要输出,您可以将其重定向到一个临时文件:

#!/usr/bin/env python3 
import subprocess 
import tempfile 

with tempfile.TemporaryFile('r+') as output_file: 
    with subprocess.Popen(['cat'], 
          stdin=subprocess.PIPE, 
          stdout=output_file, 
          universal_newlines=True) as process: 
     for i in range(100000): 
      print(i, file=process.stdin) 
    output_file.seek(0) # rewind (and sync with the disk) 
    print(output_file.readline(), end='') # get the first line of the output 

如果输入/输出小(适合在内存中);你可以一次全部通过输入并获得一次性使用.communicate()读取输出/写入的同时为你:

#!/usr/bin/env python3 
import subprocess 

cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]), 
        stdout=subprocess.PIPE, universal_newlines=True) 
print(cp.stdout.splitlines()[-1]) # print the last line 

要读/手动并发写,你可以使用线程,ASYNCIO,的fcntl等。 @Jed provided a simple thread-based solution。这里的asyncio为基础的解决方案:

#!/usr/bin/env python3 
import asyncio 
import sys 
from subprocess import PIPE 

async def pump_input(writer): 
    try: 
     for i in range(100000): 
      writer.write(b'%d\n' % i) 
      await writer.drain() 
    finally: 
     writer.close() 

async def run(): 
    # start child process 
    # NOTE: universal_newlines parameter is not supported 
    process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE) 
    asyncio.ensure_future(pump_input(process.stdin)) # write input 
    async for line in process.stdout: # consume output 
     print(int(line)**2) # print squares 
    return await process.wait() # wait for the child process to exit 


if sys.platform.startswith('win'): 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 
loop.run_until_complete(run()) 
loop.close() 

在Unix上,你可以使用基于fcntl解决方案:

#!/usr/bin/env python3 
import sys 
from fcntl import fcntl, F_GETFL, F_SETFL 
from os import O_NONBLOCK 
from shutil import copyfileobj 
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF 

def make_blocking(pipe, blocking=True): 
    fd = pipe.fileno() 
    if not blocking: 
     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK 
    else: 
     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it 


with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process: 
    make_blocking(process.stdout, blocking=False) 
    with process.stdin: 
     for i in range(100000): 
      #NOTE: the mode is block-buffered (default) and therefore 
      # `cat` won't see it immidiately 
      process.stdin.write(b'%d\n' % i) 
      # a deadblock may happen here with a *blocking* pipe 
      output = process.stdout.read(PIPE_BUF) 
      if output is not None: 
       sys.stdout.buffer.write(output) 
    # read the rest 
    make_blocking(process.stdout) 
    copyfileobj(process.stdout, sys.stdout.buffer) 
0

我能想到的最简单的办法:

from subprocess import Popen, PIPE 
from threading import Thread 

s = map(str,xrange(10000)) # a large string 
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1) 
Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start() 
print (p.stdout.read()) 

缓冲版本:

from subprocess import Popen, PIPE 
from threading import Thread 

s = map(str,xrange(10000)) # a large string 
n = 1024 # buffer size 
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n) 
Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start() 
print (p.stdout.read()) 
0

我正在寻找一个示例代码来循序迭代过程输出,因为此过程从提供的迭代器中消耗其输入(也是递增的)。基本上是:

import string 
import random 

# That's what I consider a very useful function, though didn't 
# find any existing implementations. 
def process_line_reader(args, stdin_lines): 
    # args - command to run, same as subprocess.Popen 
    # stdin_lines - iterable with lines to send to process stdin 
    # returns - iterable with lines received from process stdout 
    pass 

# Returns iterable over n random strings. n is assumed to be infinity if negative. 
# Just an example of function that returns potentially unlimited number of lines. 
def random_lines(n, M=8): 
    while 0 != n: 
     yield "".join(random.choice(string.letters) for _ in range(M)) 
     if 0 < n: 
      n -= 1 

# That's what I consider to be a very convenient use case for 
# function proposed above. 
def print_many_uniq_numbered_random_lines(): 
    i = 0 
    for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)): 
     # Key idea here is that `process_line_reader` will feed random lines into 
     # `uniq` process stdin as lines are consumed from returned iterable. 
     print "#%i: %s" % (i, line) 
     i += 1 

这里有些建议的解决方案的允许与线程这样做(但它并不总是很方便)或ASYNCIO(这是不是可以在Python 2.x的)。以下是允许执行此操作的工作实施示例。

import subprocess 
import os 
import fcntl 
import select 

class nonblocking_io(object): 
    def __init__(self, f): 
     self._fd = -1 
     if type(f) is int: 
      self._fd = os.dup(f) 
      os.close(f) 
     elif type(f) is file: 
      self._fd = os.dup(f.fileno()) 
      f.close() 
     else: 
      raise TypeError("Only accept file objects or interger file descriptors") 
     flag = fcntl.fcntl(self._fd, fcntl.F_GETFL) 
     fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 
    def __enter__(self): 
     return self 
    def __exit__(self, type, value, traceback): 
     self.close() 
     return False 
    def fileno(self): 
     return self._fd 
    def close(self): 
     if 0 <= self._fd: 
      os.close(self._fd) 
      self._fd = -1 

class nonblocking_line_writer(nonblocking_io): 
    def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep): 
     super(nonblocking_line_writer, self).__init__(f) 
     self._lines = iter(lines) 
     self._lines_ended = False 
     self._autoclose = autoclose 
     self._buffer_size = buffer_size 
     self._buffer_offset = 0 
     self._buffer = bytearray() 
     self._encoding = encoding 
     self._linesep = bytearray(linesep, encoding) 
    # Returns False when `lines` iterable is exhausted and all pending data is written 
    def continue_writing(self): 
     while True: 
      if self._buffer_offset < len(self._buffer): 
       n = os.write(self._fd, self._buffer[self._buffer_offset:]) 
       self._buffer_offset += n 
       if self._buffer_offset < len(self._buffer): 
        return True 
      if self._lines_ended: 
       if self._autoclose: 
        self.close() 
       return False 
      self._buffer[:] = [] 
      self._buffer_offset = 0 
      while len(self._buffer) < self._buffer_size: 
       line = next(self._lines, None) 
       if line is None: 
        self._lines_ended = True 
        break 
       self._buffer.extend(bytearray(line, self._encoding)) 
       self._buffer.extend(self._linesep) 

class nonblocking_line_reader(nonblocking_io): 
    def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"): 
     super(nonblocking_line_reader, self).__init__(f) 
     self._autoclose = autoclose 
     self._buffer_size = buffer_size 
     self._encoding = encoding 
     self._file_ended = False 
     self._line_part = "" 
    # Returns (lines, more) tuple, where lines is iterable with lines read and more will 
    # be set to False after EOF. 
    def continue_reading(self): 
     lines = [] 
     while not self._file_ended: 
      data = os.read(self._fd, self._buffer_size) 
      if 0 == len(data): 
       self._file_ended = True 
       if self._autoclose: 
        self.close() 
       if 0 < len(self._line_part): 
        lines.append(self._line_part.decode(self._encoding)) 
        self._line_part = "" 
       break 
      for line in data.splitlines(True): 
       self._line_part += line 
       if self._line_part.endswith(("\n", "\r")): 
        lines.append(self._line_part.decode(self._encoding).rstrip("\n\r")) 
        self._line_part = "" 
      if len(data) < self._buffer_size: 
       break 
     return (lines, not self._file_ended) 

class process_line_reader(object): 
    def __init__(self, args, stdin_lines): 
     self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
     self._reader = nonblocking_line_reader(self._p.stdout) 
     self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines) 
     self._iterator = self._communicate() 
    def __iter__(self): 
     return self._iterator 
    def __enter__(self): 
     return self._iterator 
    def __exit__(self, type, value, traceback): 
     self.close() 
     return False 
    def _communicate(self): 
     read_set = [self._reader] 
     write_set = [self._writer] 
     while read_set or write_set: 
      try: 
       rlist, wlist, xlist = select.select(read_set, write_set, []) 
      except select.error, e: 
       if e.args[0] == errno.EINTR: 
        continue 
       raise 
      if self._reader in rlist: 
       stdout_lines, more = self._reader.continue_reading() 
       for line in stdout_lines: 
        yield line 
       if not more: 
        read_set.remove(self._reader) 
      if self._writer in wlist: 
       if not self._writer.continue_writing(): 
        write_set.remove(self._writer) 
     self.close() 
    def lines(self): 
     return self._iterator 
    def close(self): 
     if self._iterator is not None: 
      self._reader.close() 
      self._writer.close() 
      self._p.wait() 
      self._iterator = None 
相关问题