2017-08-03 52 views
1

我有一个情况定期每月big_task在这个文件中,其中链接的任务是small_task_1small_task_2读取每行一个链接任务文件和排队:如何监控芹菜中的一组任务?

class BigTask(PeriodicTask): 

    run_every = crontab(hour=00, minute=00, day_of_month=1) 

    def run(self): 
     task_list = [] 
     with open("the_file.csv" as f: 
      for row in f: 
       t = chain(
         small_task_1.s(row), 
         small_task_2.s(), 
        ) 
       task_list.append(t) 
      gr = group(*task_list) 
      r = gr.apply_async() 

我想获得有关统计每个small_task每当完成所有这些任务时(无论状态如何),都将向队列项目管理员发送汇总电子邮件,并且已排队,失败的任务(以及有关例外的详细信息)的数量。

我首先想到使用chord,但callback不会执行,如果任何headers任务失败,这肯定会发生在我的情况。

我也可以在BigTask中使用r.get(),非常方便,但不推荐将任务结果等待到另一个任务中(即使这里,我猜工人死锁的风险很差,因为任务只会执行一次月)。

重要提示:输入文件包含〜700k行。

您会如何推荐继续?

+0

芹菜版? – ItayB

+0

我目前使用的是3.1.25 – stellasia

回答

1

我不确定它是否可以帮助您监控,但有关chordcallback问题,您可以使用link_error回调(用于捕获异常)。在你的情况,例如,你可以使用它像:

small_task_1.s(row).set(link_error=error_task)) 

和实施芹菜error_task是送你的通知或什么的。

在芹菜4,你可以为所有的画布设置一次(但它并没有在3.1为我工作):

r = gr.apply_async(link_error=error_task) 

对于监控的一部分,你可以使用,当然flower

希望能够帮助

编辑:另一种(不使用额外的持久性)将捕获异常,并添加一些逻辑的结果和回调。例如:

def small_task_1(): 
    try: 
     // do stuff 
     return 'success', result 
    except: 
     return 'fail', result 

然后在您的回调任务中迭代结果元组并检查失败,因为执行实际的逻辑。

+0

每个任务都会调用'link_error',然后我可以简单地使用'CELERY_SEND_TASK_ERROR_EMAILS',但是我期望有几百个任务失败,很难分析。花是伟大的,但问题是,我将不得不定期检查它,以知道任务何时完成。 – stellasia

+0

@stellasia我认为后面的例子(在芹菜4中)将被调用一次 - 但我会用替代方法编辑我的答案。 – ItayB

+1

我喜欢你最后的解决方案,会试试看。谢谢! – stellasia

0

我们用于一个部分类似问题的解决方案,其中芹菜内置的东西(任务状态等)并未真正切割它,而是在Redis中手动存储所需的信息并在需要时检索它们。

+0

感谢您的参观!我可能会得到这样一个解决方案来获得有关不同例外的统计信息。然而,我的主要问题是找到一种方法来确定“何时需要”,我的是“什么时候完成所有任务” – stellasia