2015-11-03 82 views
4

我想使用Airflow执行一个简单的任务python。气流 - Python文件不在同一个DAG文件夹中

from __future__ import print_function 
from airflow.operators.python_operator import PythonOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 


from pprint import pprint 

seven_days_ago = datetime.combine(datetime.today() - timedelta(7), 
            datetime.min.time()) 

args = { 
    'owner': 'airflow', 
    'start_date': seven_days_ago, 
} 

dag = DAG(dag_id='python_test', default_args=args) 


def print_context(ds, **kwargs): 
    pprint(kwargs) 
    print(ds) 
    return 'Whatever you return gets printed in the logs' 

run_this = PythonOperator(
    task_id='print', 
    provide_context=True, 
    python_callable=print_context, 
    dag=dag) 

,如果我尝试,例如:

airflow test python_test print 2015-01-01

它的工作原理!

现在我想把我的def print_context(ds, **kwargs)函数放在其他python文件中。所以,我创建了一个名为antoher文件:simple_test.py和变化:

run_this = PythonOperator(
    task_id='print', 
    provide_context=True, 
    python_callable=simple_test.print_context, 
    dag=dag) 

现在我再次尝试运行:

airflow test python_test print 2015-01-01

和OK!它仍然工作!

但是,如果我创建一个模块,例如,工作模块与文件SimplePython.py,进口(from worker import SimplePython),并尝试:

airflow test python_test print 2015-01-01

它给人的消息:

ImportError: No module named worker

的问题:

  1. 是否可以在DAG定义中导入模块?
  2. Airflow + Celery如何将所有必需的Python源文件分布在工作节点上?

回答

0

对于你的第一个问题,这是可能的。

我猜你应该建立在同一目录下,命名为__init__.pySimplePython.py一个空文件(这是worker目录你的情况)。通过这样做worker目录将被视为一个python模块。

然后在您的DAG定义中,尝试from worker.SimplePython import print_context

在你的情况下,我想如果你为气流写一个插件会更好,因为你可能想升级气流核心项目而不删除你的定制功能。

0

对于第二个问题:Airflow + Celery如何将所有必需的Python源文件分布在工作节点上?

来自文档:工作人员需要访问其DAGS_FOLDER,并且您需要按照自己的意思同步文件系统。一个常见的设置是将您的DAGS_FOLDER存储在Git存储库中,并使用Chef,Puppet,Ansible或用于在您的环境中配置计算机的任何机器跨计算机进行同步。如果所有的箱子都有一个共同的挂载点,让您的管道文件共享应该工作以及

http://pythonhosted.org/airflow/installation.html?highlight=chef

4

您可以打包DAG的依赖性按:

https://pythonhosted.org/airflow/concepts.html?highlight=zip#packaged-dags

To allow this you can create a zip file that contains the dag(s) in the root of the zip file and have the extra modules unpacked in directories. For instance you can create a zip file that looks like this:

my_dag1.py 
my_dag2.py 
package1/__init__.py 
package1/functions.py 

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py. It will not go into subdirectories as these are considered to be potential packages.

当使用CeleryExe cutor,你需要手动同步DAG目录,气流不照顾,对你:

https://pythonhosted.org/airflow/configuration.html#scaling-out-with-celery

The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own means

0

虽然打包的DAG到所涵盖的文档拉链是唯一支持的解决方案我已经看到,您还可以执行dags文件夹内的模块导入。如果您使用其他工具(如puppet & git)自动同步dags文件夹,这非常有用。

我不是从这个问题您的目录结构清晰,所以这里是根据典型的Python项目结构的例子DAG的文件夹:

└── airflow/dags # root airflow dags folder where all dags live 
    └── my_dags # git repo project root 
     ├── my_dags # python src root (usually named same as project) 
     │   ├── my_test_globals.py # file I want to import 
     │   ├── dag_in_package.py 
     │ └── dags 
     │  └── dag_in_subpackage.py 
     ├── README.md # also setup.py, LICENSE, etc here 
     └── dag_in_project_root.py 

我已经离开了(需要[1])__init__.py文件。请注意三个示例dag的位置。你几乎肯定会只使用这些地方中的一个来满足你的所有需求。为了举例,我将它们都包括在内,因为这对进口应该没有关系。为了从其中任何一个导入my_test_globals

from my_dags.my_dags import my_test_globals 

我认为,这意味着气流负荷的DAG文件夹作为一个Python包的每个子目录。在我的情况下,这是额外的中间项目根目录阻碍了典型的包内绝对导入。因此,我们可以重组该气流项目是这样的:

└── airflow/dags # root airflow dags folder where all dags live 
    └── my_dags # git repo project root & python src root 
     ├── my_test_globals.py # file I want to import 
     ├── dag_in_package.py 
     ├── dags 
     │ └── dag_in_subpackage.py 
     ├── README.md # also setup.py, LICENSE, etc here 
     └── dag_in_project_root.py 

使进口看起来我们希望他们:

from my_dags import my_test_globals 
相关问题