apache-airflow

    1热度

    1回答

    函数我想执行一个函数,我从任务传递一个参数。 这里是我的功能与状态参数: def sns_notify(state): client = boto3.client('sns') if state == "failed": message = config.get('sns', 'message') + state else: message =

    1热度

    1回答

    我试图通过传递一个不起作用的Bash行(thisshouldnotrun)来故意排除故障并排除错误。气流正在输出以下内容: [2017-06-15 17:44:17,869] {bash_operator.py:94} INFO - /tmp/airflowtmpLFTMX7/run_bashm2MEsS: line 7: thisshouldnotrun: command not found

    0热度

    2回答

    如何配置Airflow,以便DAG中的任何故障将(立即)导致松弛消息? 此时此刻我通过创建一个slack_failed_task对其进行管理: slack_failed_task = SlackAPIPostOperator( task_id='slack_failed', channel="#datalabs", trigger_rule='one_failed',

    2热度

    1回答

    我的想法是有一个任务foo,它生成输入列表(用户,报告,日志文件等),并为输入列表中的每个元素启动任务。目标是利用Airflow的重试和其他逻辑,而不是重新实现它。 所以,理想情况下,我应该DAG看起来是这样的: 这里唯一的变量是生成的任务数。在完成所有这些任务之后,我想做更多的任务,因此为每项任务启动新的DAG似乎并不合适。 这是我的代码: default_args = { 'owne

    0热度

    1回答

    我不明白我需要运行哪些命令才能获得DAG预定。假设我使用airflow test dag_name task_id_1 2017-06-22测试了DAG,第二项任务使用了airflow test dag_name task_id_2 2017-06-22。 我跑airflow trigger_dag dag_name,但那是为了实例化DAG恰好那一刻吗? 比方说,我想dag_name的定时/调度的

    2热度

    2回答

    我想在不与Airflow GUI交互的情况下创建S3连接。有没有可能通过airflow.cfg或命令行? 我们正在使用AWS的作用,下面的连接参数为我们工作: { “aws_account_id”: “XXXX”, “role_arn”: “YYYYY”} 所以,手动创建的GUI为S3连接工作,现在我们希望自动执行此流程,并希望将其添加为Airflow部署流程的一部分。任何工作?

    1热度

    1回答

    我是新来的气流和意外启动的守护程序模式下的气流调度程序。现在,我想杀死调度器并可能重新启动它。我试着做 sudo kill -9 <list of pids> pkill <name> 什么都没发生。当我运行 ps aux | grep 'airflow scheduler' 我看到这些项: user1 2907 6.0 1.0 329788 62996 ? Sl 17:37

    1热度

    2回答

    我有一个dag检查FTP服务器上的文件(气流运行在不同的服务器上)。如果存在文件,则文件被移到S3(我们在这里存档)。从那里,文件名被传递给Spark提交作业。火花作业将通过S3(不同服务器上的Spark集群)处理文件。我不确定是否需要有多个dag,但这里是流程。我想要做的只是在S3存储桶中存在文件时才运行Spark作业。 我尝试使用S3传感器,但它符合超时标准后失败/超时,因此整个DAG设置为失

    1热度

    2回答

    气流示例的DAG保持在UI我在配置文件关闭后还是后load_examples = False保持表示实施例的DAG。 该系统通知的DAG是不存在于DAG文件夹,但它们仍然在UI因为调度程序将其标记在元数据数据库作为活性。 我知道从那里删除它们将直接删除数据库中的这些行单程不过关当然,这并不ideal.How我应该着手删除这些UI DAGs的?

    1热度

    1回答

    当我回填特定日期的DAG,我想通过依次运行它,也就是我希望它一天 完成所有任务的特定日期未来运行一天,然后一天等等..我已经使用了depends_on_past参数,但它只是帮助我设置依赖任务而不是在DAG运行。 例如: - Dag_A有4个任务,我用回用depends_on_past填写, 在Dag_A(第一天),它触发Dag_A(第二日)的第一个任务执行的第一个任务后,我不希望它