2017-05-15 36 views
0

我一直在研究Apache Airflow以安排工作流程。我似乎遇到了安排我的DAG的问题。我一直在使用这太问题以供参考:Airflow not scheduling Correctly Python运行气流DAG的问题

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime 
from datetime import timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
'start_date': datetime.now() - timedelta(minutes=5), 
'email': ['[email protected]'], 
'email_on_failure': False, 
'email_on_retry': False, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 

dag = DAG('dag_mkdir_folder', default_args=default_args, 
     schedule_interval=timedelta(minutes=5)) 


task_hello = BashOperator(task_id='print_hello', 
         bash_command='mkdir test_airflow', dag=dag) 

我尝试使用下面的命令列表运行任务:

airflow scheduler 
airflow trigger_dag dag_mkdir_folder 

我不断收到这是一个错误:

 
[2017-05-15 13:49:06,688] {models.py:322} DagFileProcessor406 INFO -  Finding 'running' jobs without a recent heartbeat 
[2017-05-15 13:49:06,689] {models.py:328} DagFileProcessor406 INFO - Failing jobs without heartbeat after 2017-05-15 13:44:06.689284 

bash命令只是应该创建一个新的目录。测试版本正常工作。

+0

你看到了什么后,你输入“气流调度”? – Lisa

回答

0

运行不同的终端上的调度程序,然后触发DAG在另一端

也尽量要对该目录提供完整路径。 例如在气流目录中创建文件夹:

task_hello = BashOperator(task_id='print_hello', 
        bash_command="mkdir ~/airflow/test_airflow", dag=dag) 

这应该创建一个内部气流test_airflow文件夹

0

您当前bash_command告诉气流中产生的DAG使用的临时目录中的目录时,它是运行,在DAG运行后将其全部内容一起吹走。

考虑更改目录要在创建它的目录

一个bash_command与看起来像:

"cd <path_to_directory>; mkdir test_airflow"