如果getRenewed
是幂等的(也就是说,您可以多次调用它,而没有副作用),您可以简单地将现有的计时器代码移动到工作进程中,并让他们在注意到自己的计时器撞倒。这只需要从您传递在列表中的项目同步,并multiprocessing.Pool
可以处理足够容易:
def setup_worker():
global clockExp, a
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
def worker(item):
global clockExp, a
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
f(item, a)
def main(L):
pool = multiprocessing.Pool(initializer=setup_worker)
pool.map(worker, L)
如果getRenewed
不幂等,事情将需要更复杂些。您不能在每个工作进程中调用它,因此您需要在进程之间设置某种通信方法,以便每个进程在可用时都可以获得最新版本。
我建议使用multiprocessing.queue
将a
值从主进程传递给工人。您仍然可以使用Pool
作为列表项,您只需确保在主进程中异步使用它。与此类似,也许:
def setup_worker2(queue):
global x
x = random.random()
global a_queue, a, clockExp
a_queue = queue
a = a_queue.get() # wait for the first `a` value
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
def worker2(item):
global a, clockExp
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
try:
a = a_queue.get_nowait()
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
except queue.Empty:
pass
return f(item, a)
def main2(L):
queue = multiprocessing.Queue() # setup the queue for the a values
pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))
result = pool.map_async(worker2, L) # send the items to the pool asynchronously
while True: # loop for sending a values through the queue
a = getRenewed() # get a new item
for _ in range(os.cpu_count()):
queue.put(a) # send one copy per worker process
try:
result.wait(900-5) # sleep for ~15 minutes, or until the result is ready
except multiprocessing.TimeoutError:
pass # if we got a timeout, keep looping!
else:
break # if not, we are done, so break out of the loop!
工人们仍然需要有有一些计时代码,否则你面临的竞争条件,其中一个工人可能会消耗两个单发下来队列中a
值批量来自主流程。如果对f
的某些调用比其他调用慢得多(如果涉及从网上下载东西的话可能很可能会发生这种情况)。
“f”或“getRenewed”依赖任何特定于进程的状态,还是只依赖(或修改)外部状态? – Blckknght 2014-10-02 22:27:20
f下载一个网站,通过L. getRenewed中的项目口述得到一个认证令牌。 – Kevin 2014-10-02 23:37:14