2017-07-14 91 views
0

我们有一个直接与DAG API(DagBag()get_dag()然后dag_run()如何将参数提交给Airflow中的Python程序?

的DAG运行正常,问题是,我们能不能找到一种方法与执行这样的DAG执行的DAG前端服务器具体参数。

最接近的解决方案是使用变量API,它使用set()get()方法,但这些变量是全局变量,并且在使用相同变量名称的并发操作中工作时可能会发生冲突。

我们如何运行一个dag并设置可用的参数来执行它?我们主要使用PythonOperator。

编辑1: 我们的程序是一个Python Django前端服务器。所以,我们正在通过另一个Python程序与Airflow进行交流。这意味着我们通过Python触发dags,因此,使用DagBag.get_dag()从airflow服务检索信息。 run_dag()没有办法通过直接的参数虽然

回答

0

如果使用trigger_dag_run(通过命令行或从另一DAG)触发DAG,你可以通过任何JSON作为有效载荷。

另一种选择是将参数列表存储在文件中,并将该文件的位置存储为变量。然后DAG可以将该文件位置传递给python运算符,然后运算符可以处理读取该文件并从中解析参数。

如果这两个解决方案对您的用例不起作用,提供有关您的dag和参数类型的更多详细信息可能会有所帮助。

+0

我编辑过这篇文章,我们使用了django前端,因此触发器是由另一个python构成的。理想情况下,我们想传递Python参数(字典,列表等)。我们如何使用trigger_dag_run并传递一个有效载荷?如果它是一个JSON有效载荷,它意味着它可以很容易地成为一个Python字典。这已经足够了。对于第二种选择,我们如何传递给dag文件路径?如果其他用户使用不同的参数触发相同的DAG,则它必须工作。 – Saif

+0

对于您的用例,您是否查看了airflow中的json_client.py文件(它提供了实验休息api)。你可以在POST请求中提供'conf'参数,它基本上是任何json对象。如果这能解决您的问题,我也会将其添加到答案中。 https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/api/client/json_client.py – Him

相关问题