我通过python的子进程(特别是在AWS EC2上)在Linux机器上启动进程,该进程会生成大量文件。我需要“tail -f”这些文件,并将每个结果的jsonified输出发送到它们各自的AWS SQS队列。我将如何去做这样的任务?将文件拖入消息队列
编辑
正如所建议的这个答案,asyncproc,并PEP3145,我可以用下面这样做:
from asyncproc import Process
import Queue
import os
import time
# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}
def handle_proc(p,q):
latest = p.read()
if latest:
# If nothing new, latest will be an empty string
q.put(latest)
retcode = p.wait(flags=os.WNOHANG)
return retcode
while len(running_procs):
proc_names = running_procs.keys()
for proc_name in proc_names:
proc, q = running_procs[proc_name]
retcode = handle_proc(proc, q)
if retcode is not None: # Process finished.
del running_procs[proc_name]
time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
print(msg_queue.get())
这应该是足够了,我想,除非其他人可以提供更好的答案。
多个编辑
我的得太多问题。虽然上述方法很好,但我认为最简单的解决方案是: - 检查是否存在文件 - 如果文件存在,请将它们复制到AWS S3上的存储桶并通过AWS SQS发送文件已复制的消息。重复每60秒 - 消费者应用程序投票SQS,并最终收到文件已被复制的消息 - 消费者应用程序从S3下载文件并用最新的内容替换以前的内容。重复,直到工作完成
尽管子进程中的异步IO的整个问题仍然是一个问题。
您确定不想使用FIFO或FD直接加入进程吗? – 2011-04-20 01:04:26