2017-04-13 121 views
0

我正在使用apache airflow 1.8.0apache airflow scheduler not scheduling jobs

这里输出的是当我backfill的工作。

[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00  [scheduled]> 
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]> 
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]> 
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2'] 

,当我尝试安排它抛出错误的任何DAG

Traceback (most recent call last): 
    File "/anaconda3/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill 
    pool=args.pool) 
    File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run 
    job.run() 
    File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run 
    self._execute() 
    File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute 
    raise AirflowException(err) 
airflow.exceptions.AirflowException: --------------------------------------------------- 

这是关于任务的输出。

BackfillJob is deadlocked. These tasks have succeeded: 
set() 
These tasks have started: 
{} 
These tasks have failed: 
set() 
These tasks are skipped: 
set() 
These tasks are deadlocked: 
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>} 

蟒2.7蟒3.5

使用SequentialExecutorLocalExecutor

PS测试。如果我在当前时间回填DAG,它会执行一次,然后针对所有计划的任务抛出上述错误。

回答

0

您的气流实例处于死锁状态。失败的任务不允许将来执行任务。

气流推出每项任务在运行一个新的过程中的每个DAG和任务的时候上不去,这是不处理死锁的情况出现

要解决此问题,您可以执行下列操作之一:

  1. use **airflow clear** <<dag_id>>这将解决死锁并允许未来DAG /任务的运行
  2. 如果以上不能解决问题,则需要use airflow resetdb这将清除气流数据库,从而解决问题

今后,

  • 尝试使用execution_timeout=timedelta(minutes=2)设置一些超时,使你有运营商明确控制
  • 另外,确实提供了on_failure_callback=handle_failure这将干净存在操作上的失败

希望这会有所帮助,

干杯!