2014-10-02 58 views
1

我有一个大的列表L来操作。令f()是对L进行操作的函数。f()需要另一个变量,每15分钟过期并需要更新。下面是一个例子,在串行:受到定时器的多处理器

def main(): 
    L = openList() 
    # START THE CLOCK 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 
    for item in L: 
     f(item, a) # operate on item given a 
     # CHECK TIME REMAINING 
     clockCur = dt.datetime.now() 
     clockRem = (clockExp - clockCur).total_seconds() 
     # RENEW a IF NEEDED 
     if clockRem < 5: # renew with 5 seconds left 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
      a = getRenewed() 

因为f()需要几秒钟(或更长有时),我想并行化代码。给定计时器的任何提示怎么做?我设想共享clockExp和“a”,当进程满足clockRem < 5时,它调用getRenewed()并共享新的“a”和clockExp,然后重复。

+0

“f”或“getRenewed”依赖任何特定于进程的状态,还是只依赖(或修改)外部状态? – Blckknght 2014-10-02 22:27:20

+0

f下载一个网站,通过L. getRenewed中的项目口述得到一个认证令牌。 – Kevin 2014-10-02 23:37:14

回答

3

如果getRenewed是幂等的(也就是说,您可以多次调用它,而没有副作用),您可以简单地将现有的计时器代码移动到工作进程中,并让他们在注意到自己的计时器撞倒。这只需要从您传递在列表中的项目同步,并multiprocessing.Pool可以处理足够容易:

def setup_worker(): 
    global clockExp, a 

    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 

def worker(item): 
    global clockExp, a 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 

    if clockRem < 5: # renew with 5 seconds left 
     clockStart = dt.datetime.now() 
     clockExp = clockStart + dt.timedelta(seconds=900) 
     a = getRenewed() 

    f(item, a) 

def main(L): 
    pool = multiprocessing.Pool(initializer=setup_worker) 

    pool.map(worker, L) 

如果getRenewed不幂等,事情将需要更复杂些。您不能在每个工作进程中调用它,因此您需要在进程之间设置某种通信方法,以便每个进程在可用时都可以获得最新版本。

我建议使用multiprocessing.queuea值从主进程传递给工人。您仍然可以使用Pool作为列表项,您只需确保在主进程中异步使用它。与此类似,也许:

def setup_worker2(queue): 
    global x 
    x = random.random() 
    global a_queue, a, clockExp 

    a_queue = queue 
    a = a_queue.get() # wait for the first `a` value 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 

def worker2(item): 
    global a, clockExp 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 
    if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed 
     try: 
      a = a_queue.get_nowait() 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
     except queue.Empty: 
      pass 

    return f(item, a) 

def main2(L): 
    queue = multiprocessing.Queue()  # setup the queue for the a values 

    pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,)) 

    result = pool.map_async(worker2, L) # send the items to the pool asynchronously 

    while True:     # loop for sending a values through the queue 
     a = getRenewed()   # get a new item 
     for _ in range(os.cpu_count()): 
      queue.put(a)   # send one copy per worker process 

     try: 
      result.wait(900-5) # sleep for ~15 minutes, or until the result is ready 
     except multiprocessing.TimeoutError: 
      pass     # if we got a timeout, keep looping! 
     else: 
      break     # if not, we are done, so break out of the loop! 

工人们仍然需要有有一些计时代码,否则你面临的竞争条件,其中一个工人可能会消耗两个单发下来队列中a值批量来自主流程。如果对f的某些调用比其他调用慢得多(如果涉及从网上下载东西的话可能很可能会发生这种情况)。

+0

感谢您的提示。事实证明,我可以拥有多个活动令牌,所以第一个解决方案可以工作! – Kevin 2014-10-04 18:34:43