2017-08-31 94 views
0

我遇到与下面这种芹菜工作流程的一些非常奇怪的行为链:芹菜帆布组传递参数太多组成任务

workflow = group(
    chain(task1.s(), task2.s()), 
    chain(task3.s(), task4.s()), 
) 

这是在Django的背景下。

当我调用工作流程如下:

workflow.apply_async((n,))

...对于n的任意整数值,在每个链(task1task3)第一个任务将会失败,像以下一个TypeError (从celery events拍摄):

args: [9, 8, 7, 5, 4, 3] 
    kwargs: {} 
    retries: 0 
    exception: TypeError('task1() takes exactly 1 argument (6 given)',) 
    state: FAILURE 

后的第一个参数是始终工作流之前用所谓的论据。所以,在这个例子中,我在这个时候调用了workflow.apply_async((9,)),其他的数字是以前通过的值。每一次,传递给task1task3的恶意论据都是一样的。

我很想试试把它作为一个错误报告发给芹菜,但我还不确定这个错误是不是我的。

事情我已经排除了:

  • 我肯定传递参数我想我传递给workflow.apply_async。我已经单独构建并记录了我传递的元组,以确保这一点。
  • 这与将一个列表(即可变)传递给apply_async而不是元组无关。我肯定会传递一个元组(即不可变)。

的只有适度的不寻常的事情对我的设置,虽然我不能看到它是如何连接的,它是task1task3与不同的队列配置。

回答

0

曾经碰到过类似的问题,当我与芹菜task.chunks工作()

我具有包含到一个单一的元组的产品清单解决它。例如,

假设任务log_i()是shared_task基本上记录变量i,我希望通过分块我会做记录所有i s的名单 -

# log_i shared Task 
@shared_task(bind=True) 
def log_i(self, i): 
    logger.info(i) 
    return i 

而且

# some calling site 
# ... 
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)] 
res = log_i.chunks(zip(very_long_list), 10)() 
print(res.get()) 

# ... 

自我提醒,这样做的东西,如 -

# ... 
res = log_i.chunks(very_long_list, 10)() 
# ... 

将失败,并显示错误信息,当列表中的项目不是可迭代项目时。

压缩将项目按原样移动到新元组中,通过此功能,您可以将其捕获到log_i任务中的单个参数中。