Python版本2.7.x和气流1.5.1版
我DAG的脚本是这样的
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Vignesh',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
从,你可以看到我正在创建一个带有6个任务的DAG第一个任务(Start1)首先启动,之后所有其他五个任务开始
目前我已经给DAG的首发
它已经完全跑了所有六个任务的第一种类型,但五分钟后DAG不重新启动
至今已有然后1之间的时间5分钟延迟小时仍然DAG不重新启动我真的不知道我是错的。
如果有人能指出我有什么不对,那我真的很高兴。我尝试用airflow testing clear
清除,然后发生同样的事情。它跑了一次然后就站在那里。
在命令行中显示的唯一事情是Getting all instance for DAG testing
当我改变schedule_interval只是它与任何计划间隔parallel.That是在5分钟内300个或更多的任务实例已完成运行的位置。有NO 5分钟的调度时间间隔
代码2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Vignesh',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
感谢维涅什,
所以你说它会每五秒运行一次,直到执行日期为当前日期ti我之后它会按照预定的时间间隔 – The6thSense
是的,这就是我的意思。 – Yongyiw
非常感谢,但我有两个疑惑。我怎么能安排一个任务从这秒开始,时间间隔为一个小时。我可以安排一个未来的工作 – The6thSense