2017-06-06 96 views
0

我正在尝试使用Airflow来替换我们现有的cron编排,并且所有内容都看起来很有希望。我已经成功安装并获得了一份计划和执行的数据,但是我注意到他们在我指定的每项任务(至少15分钟到60分钟)之间存在显着的延迟。气流dag bash任务滞后于远程执行

我的DAG定义如下

我缺少的东西,使他们在其他以后运行一个吧?

我不使用芹菜 两个调度器和Web服务器在同一主机 上运行,并且是 - 需要调用一个远程执行(某种形式的地方,直到然后在工作) ,没有不能遥控器上安装气流服务器 Dag应该在UTC时间的凌晨1点每天运行一次,并遵循我给定的任务的设置路径。

import airflow 
from builtins import range 
from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 

args = { 
    'owner': 'user1', 
    'depends_on_past': False, 
    'start_date': airflow.utils.dates.days_ago(2), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': False, 
    'wait_for_downstream': True, 
    'schedule_interval': None, 
    'depends_on_past': True, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5) 
} 

dag = DAG(
     dag_id='airflow_pt1' 
    , default_args=args 
    , schedule_interval='0 1 * * *' 
    , dagrun_timeout=timedelta(hours=8)) 

task1 = BashOperator(
     task_id='task1' 
    , bash_command='ssh [email protected] /path/to/remote/execution/script_task1.sh' 
    , dag=dag,env=None, output_encoding='utf-8') 

task2 = BashOperator(
     task_id='task2' 
    , bash_command='ssh [email protected] /path/to/remote/execution/script_task2.sh' 
    , dag=dag,env=None, output_encoding='utf-8') 

task3 = BashOperator(
     task_id='task3' 
    , bash_command='ssh [email protected] /path/to/remote/execution/script_task3.sh' 
    , dag=dag,env=None, output_encoding='utf-8') 

task4 = BashOperator(
     task_id='task4' 
    , bash_command='ssh [email protected] /path/to/remote/execution/script_task4.sh' 
    , dag=dag,env=None, output_encoding='utf-8') 

task2.set_upstream(task1) 
task3.set_upstream(task1) 
task4.set_upstream(task2) 

注意我没有执行气流回填(是那么重要吗?)

+0

我不确定'directed-acyclic-graphs'标签在这里是否合适 - 图论中的主题专业知识不会帮助某人回答这个问题,因此,在其Feed中跟随该标签的人可能发现这个问题不是他们感兴趣的,除非他们*也跟随'airflow'。 –

+0

顺便说一句,你确定这些任务实际上是在你期望它们完成时退出吗?你检查过程表吗? –

+0

我有任务监视器上捕获的日志/标准输出很好地显示。我很好奇,如果我需要多个工作人员为调度程序排队后续任务 –

回答