2015-10-14 59 views
8

代码:气流不调度正确的Python

Python版本2.7.x和气流1.5.1版

我DAG的脚本是这样的

from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'Vignesh', 
'depends_on_past': False, 
'start_date': datetime(2015,10,13), 
'email': ['[email protected]'], 
'schedule_interval':timedelta(minutes=5), 
'email_on_failure': True, 
'email_on_retry': True, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 
dag = DAG('testing', default_args=default_args) 
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) 
for i in range(5): 
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) 
    t.set_upstream(run_this_first) 

从,你可以看到我正在创建一个带有6个任务的DAG第一个任务(Start1)首先启动,之后所有其他五个任务开始

目前我已经给DAG的首发

它已经完全跑了所有六个任务的第一种类型,但五分钟后DAG不重新启动

至今已有然后1之间的时间5分钟延迟小时仍然DAG不重新启动我真的不知道我是错的。

如果有人能指出我有什么不对,那我真的很高兴。我尝试用airflow testing clear清除,然后发生同样的事情。它跑了一次然后就站在那里。

在命令行中显示的唯一事情是Getting all instance for DAG testing

当我改变schedule_interval只是它与任何计划间隔parallel.That是在5分钟内300个或更多的任务实例已完成运行的位置。有NO 5分钟的调度时间间隔

代码2:

from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'Vignesh', 
'depends_on_past': False, 
'start_date': datetime(2015,10,13), 
'email': ['[email protected]'], 
'email_on_failure': True, 
'email_on_retry': True, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here 
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) 
for i in range(5): 
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) 
    t.set_upstream(run_this_first) 

感谢维涅什,

回答

4

代码2,我猜测为什么它运行每分钟的原因是:

  1. 开始时间为2015-10-13 00:00

  2. 程序间隔为5分钟

  3. 调度的每一次心跳(缺省为5秒),您的DAG将被选中

    • 首先检查:开始日期(没有最后执行日期发现)+调度 当前时间间隔<?如果是,则将执行DAG并记录最后的执行时间 。 (例如:2015-10-13 00:00 + 5min < current?)
    • 第二次检查下次心跳:上次执行时间+调度程序 区间<当前时间?如果是的话,DAG将被再次执行。
    • ....

将该溶液设置DAG作为起始日期datetime.now() - schedule_interval

和如果要调试:

  1. settings.py中的LOGGINGLEVEL设置为debug

  2. 修改类方法的airflow.models.TaskInstanceis_queueable()

def is_queueable(self, flag_upstream_failed=False): 
    logging.debug('Checking whether task instance is queueable or not!') 
    if self.execution_date > datetime.now() - self.task.schedule_interval: 
     logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) 
     return False 
     ... 
+0

所以你说它会每五秒运行一次,直到执行日期为当前日期ti我之后它会按照预定的时间间隔 – The6thSense

+0

是的,这就是我的意思。 – Yongyiw

+0

非常感谢,但我有两个疑惑。我怎么能安排一个任务从这秒开始,时间间隔为一个小时。我可以安排一个未来的工作 – The6thSense

3

由于开始时间(2015-10-13 00:00)小于现在时间,它会触发气流backfill。它将从2015-10-13 00:00开始,当气流调度程序检测到每秒钟(它的开始日期),但执行日期在5分钟(任务间隔时间)之间。

查看日志名称:

$tree airflow/logs/testing/ 
testing/ 
|-- Orders10 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders11 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders12 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders13 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders14 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
-- Start1 
    |-- 2015-10-13T00:00:00 
    |-- 2015-10-13T00:05:00 
    |-- 2015-10-13T00:10:00 
    -- 2015-10-13T00:15:00 

见日志的创建时间:

$ll airflow/logs/testing/Start1 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00 

此外,你可以看到在网络用户界面任务实例:

air flow Task Instances

+0

是的,你是对的 – The6thSense