2013-04-23 47 views
2

我想在它们到达时迭代ipython并行映射的一些异步结果。我能找到的唯一方法是迭代结果对象。但是,如果其中一项任务引发异常,则迭代终止。有没有办法做到这一点?请参阅下面的代码,迭代在第二个作业引发异常时终止。在等待下一个ipython并行映射结果时处理异常

from IPython import parallel 

def throw_even(i): 
    if i % 2 == 0: 
     raise RuntimeError('ERROR: %d' % i) 
    return i 

rc = parallel.Client() 
lview = rc.load_balanced_view() # default load-balanced view 

# map onto the engines. 
args = range(1, 5) 
print args 
async_results = lview.map_async(throw_even, range(1, 5), ordered=True) 

# get results 
args_iter = iter(args) 
results_iter = iter(async_results) 
while True: 
    try: 
     arg = args_iter.next() 
     result = results_iter.next() 
     print 'Job %s completed: %d' % (arg, result)    
    except StopIteration: 
     print 'Finished iteration' 
     break 
    except Exception as e: 
     print '%s: Job %d: %s' % (type(e), arg, e) 

给出了下面的输出作业3之前停止,4报告

[1, 2, 3, 4] 
Job 1 completed: 1 
<class 'IPython.parallel.error.RemoteError'>: Job 2: RuntimeError(ERROR: 2) 
Finished iteration 

是否有某种方式来做到这一点?

+0

我意识到,地图习语不是一个合适的方式来做到这一点。我最好使用lview.apply并单独处理每个结果。 – John 2013-04-24 09:13:10

回答

0

这个question可能是相关的。不过,我不明白为什么要从远程引擎中抛出异常。虽然,如果你确实想这样做,我认为你可以用我回答提到的问题的方式来做到这一点。我现在看到你已经在你的评论中意识到了这一点,但无论如何这应该做到这一点。

def throw_even(i): 
    if i%2: 
     return i 
    raise(RuntimeError('Error %d'%i) 

params = range(1,5) 

n_cores = len(c.ids) 
for n,p in enumerate(params): 
    core = c.ids[n%n_cores] 
    calls.append(c[core].apply_async(throw_even, p)) 

#then you get the results 

while calls != []: 
    for c in calls: 
     try: 
      result = c.get(1e-3) 
      print(result[0]) 
      calls.remove(c) 
      #in the case your call failed, you can apply_async again. 
      # and append the call to calls. 
     except parallel.TimeoutError: 
      pass 
     except Exception as e: 
      knock_yourself_out(e) 
+1

并不总是你要在远程引擎上引发异常,这是因为你的代码/数据发现了新的和有趣的方式来打破远程引擎;)当你无法取回500多个结果时,这非常烦人因为他们中有7人有棘手的数据。 – tacaswell 2013-11-15 04:21:57

+0

当然,为每个参数创建一个不同的视图应该保持异常封装。 – 2013-11-18 10:06:53

0

解决这个偷偷摸摸的是达到到内部的AsyncMapResult的抢_result这是一个结果列表。这不直接帮助你,但只是事后:

tt = async_results._results 
fail_indx = [j for j, r in enumerate(tt) if isinstance(r, IPython.parallel.error.RemoteError)] 
good_indx = [j for j, r in enumerate(tt) if not isinstance(r, IPython.parallel.error.RemoteError)] 

just_the_results = [r for r in tt if not isinstance(r, IPython.parallel.error.RemoteError)]