2016-07-25 60 views
6

将参数传递到气流中的相关任务的方式是什么?我有很多bashes文件,我试图将这种方法迁移到气流,但我不知道如何在任务之间传递一些属性。将气流参数传递给相关任务

这是一个真实的例子:

#sqoop bash template 
sqoop_template = """ 
     sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ 
    """ 

s3_template = """ 
     s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} 
    """ 



#Task of extraction in EMR 
t1 = BashOperator(
     task_id='extract_account', 
     bash_command=sqoop_template, 
     params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, 
     dag=dag) 
#Task to upload in s3 backup. 
t2 = BashOperator(
     task_id='s3_upload', 
     bash_command=s3_template, 
     params={}, #here i need the dir name created in t1 
     depends_on_past=True 
    ) 

t2.set_upstream(t1) 

在T2我需要访问t1中创建的目录名。

解决方案

#Execute a valid job sqoop 
def sqoop_import(table_name, job_name): 
    s3, hdfs = dirpath(table_name) 
    sqoop_job = job_default_config(job_name, hdfs) 
    #call(sqoop_job) 
    return {'hdfs_dir': hdfs, 's3_dir': s3} 

def s3_upload(**context): 
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] 
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] 
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] 
    #call(s3_cpdist_job) 
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') 

def sns_notify(**context): 
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] 
    client = boto3.client('sns') 
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' 
    response = client.publish(TargetArn=arn, Message=s3) 
    return response 

这不是是明确的解决方案,所以改善是值得欢迎的。谢谢。

+0

在我看来,一种解决方案是用t1中创建的属性创建一个文件,并在t2中使用该文件。 –

回答