我想使用Airflow执行一个简单的任务python。气流 - Python文件不在同一个DAG文件夹中
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
,如果我尝试,例如:
airflow test python_test print 2015-01-01
它的工作原理!
现在我想把我的def print_context(ds, **kwargs)
函数放在其他python文件中。所以,我创建了一个名为antoher文件:simple_test.py和变化:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
现在我再次尝试运行:
airflow test python_test print 2015-01-01
和OK!它仍然工作!
但是,如果我创建一个模块,例如,工作模块与文件SimplePython.py
,进口(from worker import SimplePython
),并尝试:
airflow test python_test print 2015-01-01
它给人的消息:
ImportError: No module named worker
的问题:
- 是否可以在DAG定义中导入模块?
- Airflow + Celery如何将所有必需的Python源文件分布在工作节点上?