我有一堆长时间运行的进程,我想分成多个进程。这部分我可以没有问题。我碰到的问题有时这些进程进入挂起状态。为了解决这个问题,我希望能够为一个进程正在处理的每个任务设置一个时间阈值。当超过时间阈值时,我想重新启动或终止任务。Python多重处理 - 终止/重启工作进程
最初我的代码非常简单,使用一个进程池,但是对于池我无法弄清楚如何检索池中的进程,从来不知道如何重新启动/终止池中的进程。
我已转向使用队列和处理对象作为在该示例(https://pymotw.com/2/multiprocessing/communication.html#passing-messages-to-processes与一些变化中示出。
我尝试算出这个是在下面的代码,在它的当前状态的过程中没有实际上终止了,除此之外,我无法弄清楚在当前任务结束后如何让程序转到下一个任务上,任何建议/帮助表示赞赏,或许我会以错误的方式回答这个问题
谢谢
import multiprocess
import time
class Consumer(multiprocess.Process):
def __init__(self, task_queue, result_queue, startTimes, name=None):
multiprocess.Process.__init__(self)
if name:
self.name = name
print 'created process: {0}'.format(self.name)
self.task_queue = task_queue
self.result_queue = result_queue
self.startTimes = startTimes
def stopProcess(self):
elapseTime = time.time() - self.startTimes[self.name]
print 'killing process {0} {1}'.format(self.name, elapseTime)
self.task_queue.cancel_join_thread()
self.terminate()
# now want to get the process to start procesing another job
def run(self):
'''
The process subclass calls this on a separate process.
'''
proc_name = self.name
print proc_name
while True:
# pulling the next task off the queue and starting it
# on the current process.
task = self.task_queue.get()
self.task_queue.cancel_join_thread()
if task is None:
# Poison pill means shutdown
#print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
self.startTimes[proc_name] = time.time()
answer = task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b, startTimes):
self.a = a
self.b = b
self.startTimes = startTimes
self.taskName = 'taskName_{0}_{1}'.format(self.a, self.b)
def __call__(self):
import time
import os
print 'new job in process pid:', os.getpid(), self.taskName
if self.a == 2:
time.sleep(20000) # simulate a hung process
else:
time.sleep(3) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
# tasks = this is the work queue and results is for results or completed work
tasks = multiprocess.JoinableQueue()
results = multiprocess.Queue()
#parentPipe, childPipe = multiprocess.Pipe(duplex=True)
mgr = multiprocess.Manager()
startTimes = mgr.dict()
# Start consumers
numberOfProcesses = 4
processObjs = []
for processNumber in range(numberOfProcesses):
processObj = Consumer(tasks, results, startTimes)
processObjs.append(processObj)
for process in processObjs:
process.start()
# Enqueue jobs
num_jobs = 30
for i in range(num_jobs):
tasks.put(Task(i, i + 1, startTimes))
# Add a poison pill for each process object
for i in range(numberOfProcesses):
tasks.put(None)
# process monitor loop,
killProcesses = {}
executing = True
while executing:
allDead = True
for process in processObjs:
name = process.name
#status = consumer.status.getStatusString()
status = process.is_alive()
pid = process.ident
elapsedTime = 0
if name in startTimes:
elapsedTime = time.time() - startTimes[name]
if elapsedTime > 10:
process.stopProcess()
print "{0} - {1} - {2} - {3}".format(name, status, pid, elapsedTime)
if allDead and status:
allDead = False
if allDead:
executing = False
time.sleep(3)
# Wait for all of the tasks to finish
#tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
你可以减少你的代码到[mcve]。目前它太复杂了。 –
阅读关于[multiprocessing.html#synchronization-primitives](https://docs.python.org/3.6/library/multiprocessing.html#synchronization-primitives),Section _ ** multiprocessing.Event ** _。 – stovfl