我们有一个直接与DAG API(DagBag()
,get_dag()
然后dag_run()
)如何将参数提交给Airflow中的Python程序?
的DAG运行正常,问题是,我们能不能找到一种方法与执行这样的DAG执行的DAG前端服务器具体参数。
最接近的解决方案是使用变量API,它使用set()
和get()
方法,但这些变量是全局变量,并且在使用相同变量名称的并发操作中工作时可能会发生冲突。
我们如何运行一个dag并设置可用的参数来执行它?我们主要使用PythonOperator。
编辑1: 我们的程序是一个Python Django前端服务器。所以,我们正在通过另一个Python程序与Airflow进行交流。这意味着我们通过Python触发dags,因此,使用DagBag.get_dag()
从airflow服务检索信息。 run_dag()
没有办法通过直接的参数虽然
我编辑过这篇文章,我们使用了django前端,因此触发器是由另一个python构成的。理想情况下,我们想传递Python参数(字典,列表等)。我们如何使用trigger_dag_run并传递一个有效载荷?如果它是一个JSON有效载荷,它意味着它可以很容易地成为一个Python字典。这已经足够了。对于第二种选择,我们如何传递给dag文件路径?如果其他用户使用不同的参数触发相同的DAG,则它必须工作。 – Saif
对于您的用例,您是否查看了airflow中的json_client.py文件(它提供了实验休息api)。你可以在POST请求中提供'conf'参数,它基本上是任何json对象。如果这能解决您的问题,我也会将其添加到答案中。 https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/api/client/json_client.py – Him