0
我正在尝试使用Airflow来替换我们现有的cron编排,并且所有内容都看起来很有希望。我已经成功安装并获得了一份计划和执行的数据,但是我注意到他们在我指定的每项任务(至少15分钟到60分钟)之间存在显着的延迟。气流dag bash任务滞后于远程执行
我的DAG定义如下
我缺少的东西,使他们在其他以后运行一个吧?
我不使用芹菜 两个调度器和Web服务器在同一主机 上运行,并且是 - 需要调用一个远程执行(某种形式的地方,直到然后在工作) ,没有不能遥控器上安装气流服务器 Dag应该在UTC时间的凌晨1点每天运行一次,并遵循我给定的任务的设置路径。
import airflow from builtins import range from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.models import DAG from datetime import datetime, timedelta args = { 'owner': 'user1', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(2), 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': False, 'wait_for_downstream': True, 'schedule_interval': None, 'depends_on_past': True, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( dag_id='airflow_pt1' , default_args=args , schedule_interval='0 1 * * *' , dagrun_timeout=timedelta(hours=8)) task1 = BashOperator( task_id='task1' , bash_command='ssh [email protected] /path/to/remote/execution/script_task1.sh' , dag=dag,env=None, output_encoding='utf-8') task2 = BashOperator( task_id='task2' , bash_command='ssh [email protected] /path/to/remote/execution/script_task2.sh' , dag=dag,env=None, output_encoding='utf-8') task3 = BashOperator( task_id='task3' , bash_command='ssh [email protected] /path/to/remote/execution/script_task3.sh' , dag=dag,env=None, output_encoding='utf-8') task4 = BashOperator( task_id='task4' , bash_command='ssh [email protected] /path/to/remote/execution/script_task4.sh' , dag=dag,env=None, output_encoding='utf-8') task2.set_upstream(task1) task3.set_upstream(task1) task4.set_upstream(task2)
注意我没有执行气流回填(是那么重要吗?)
我不确定'directed-acyclic-graphs'标签在这里是否合适 - 图论中的主题专业知识不会帮助某人回答这个问题,因此,在其Feed中跟随该标签的人可能发现这个问题不是他们感兴趣的,除非他们*也跟随'airflow'。 –
顺便说一句,你确定这些任务实际上是在你期望它们完成时退出吗?你检查过程表吗? –
我有任务监视器上捕获的日志/标准输出很好地显示。我很好奇,如果我需要多个工作人员为调度程序排队后续任务 –