2011-04-20 81 views
2

我通过python的子进程(特别是在AWS EC2上)在Linux机器上启动进程,该进程会生成大量文件。我需要“tail -f”这些文件,并将每个结果的jsonified输出发送到它们各自的AWS SQS队列。我将如何去做这样的任务?将文件拖入消息队列

编辑

正如所建议的这个答案,asyncproc,并PEP3145,我可以用下面这样做:

from asyncproc import Process 
import Queue 
import os 
import time 

# Substitute AWS SQS for Queue 
sta_queue = Queue.Queue() 
msg_queue = Queue.Queue() 
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)} 

def handle_proc(p,q): 
    latest = p.read() 
    if latest: 
     # If nothing new, latest will be an empty string 
     q.put(latest) 
    retcode = p.wait(flags=os.WNOHANG) 
    return retcode 

while len(running_procs): 
    proc_names = running_procs.keys() 
    for proc_name in proc_names: 
     proc, q = running_procs[proc_name] 
     retcode = handle_proc(proc, q) 
     if retcode is not None: # Process finished. 
      del running_procs[proc_name] 
    time.sleep(1.0) 
print("Status queue") 
while not sta_queue.empty(): 
    print(sta_queue.get()) 
print("Message queue") 
while not msg_queue.empty(): 
    print(msg_queue.get()) 

这应该是足够了,我想,除非其他人可以提供更好的答案。

多个编辑

我的得太多问题。虽然上述方法很好,但我认为最简单的解决方案是: - 检查是否存在文件 - 如果文件存在,请将它们复制到AWS S3上的存储桶并通过AWS SQS发送文件已复制的消息。重复每60秒 - 消费者应用程序投票SQS,并最终收到文件已被复制的消息 - 消费者应用程序从S3下载文件并用最新的内容替换以前的内容。重复,直到工作完成

尽管子进程中的异步IO的整个问题仍然是一个问题。

+0

您确定不想使用FIFO或FD直接加入进程吗? – 2011-04-20 01:04:26

回答

0

您可以使用subprocess.Popen类运行尾部并读取其输出。

try: 
    process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE) 
except (OSError, ValueError): 
    pass # TODO: handle errors 
output = process.stdout.read() 

subprocess.check_output函数以单行提供此功能。这是Python 2.7版本中的新功能。

try: 
    output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE) 
except CalledProcessError: 
    pass # TODO: handle errors 

对于非阻塞I/O,请参见this question

+0

会不会阻止? – user90855 2011-04-20 13:23:53

+0

是的,这个例子会阻塞。但是,它可以在不使用subprocess.Popen阻塞的情况下完成。例如,Popen.poll()让你知道进程是否已经终止而没有阻塞。如果在进程终止后执行读取操作,则不会阻塞。 – 2011-04-20 13:55:34

+0

确实如此,但是整个观点是“tail -f”部分,它在主进程完成(几天后)完成后才会终止,直到实例关闭。 – user90855 2011-04-20 17:20:20