如果条件不符合,我想显式地使任务失败。例如: @app.task(bind=True, name="task.my_task", max_retries=2)
def my_task(self, filename):
result = get_result(filename)
if result is None:
self.update_state(task_id
我知道这将被视为重复,但我有环顾四周问这个问题之前,但所有的问题似乎是过期或不符合我的问题在所有帮助。这是我写这个问题之前看了: How do you unit test a Celery task?(5岁,全死链接) How to unit test code that runs celery tasks?(2岁) How do I capture Celery tasks during
我正在尝试对每日运行的芹菜任务运行单元测试。 我试过导入函数并在我的测试中调用它,但这不起作用。 的任务是: @shared_task
def create_a_notification_if_a_product_is_in_or_out_of_season():
"""
Send a notification if a product is now in or out o