2017-10-16 477 views
1

使用气流将CSV文件传输到kafka主题的最佳方法是什么?使用airflow将文件传输到kafka

写一个自定义的运算符用于气流?

+0

你是真的将这些文件加载​​到文件中,还是将它们加入到文件中?气流确实支持配料/微配料,但对于流媒体来说,我的经验表明它不是太好,基本上就像_nano_-batching。我对远程主机上的CSV文件进行了大量轮询,并将它们作为批次拉入BigQuery中。 – Mike

+0

我逐行处理它们并将每行发送到kafka。 – bsd

回答

1

可能最好使用PythonOperator来逐行处理文件。我有一个用于轮询和SFTP服务器文件的用例,当我找到一些时,我逐行处理它们,并将结果写成JSON。我不喜欢的东西解析日期为YYYY-MM-DD格式等这样的事情可能为你工作:

def csv_file_to_kafka(**context): 

    f = '/path/to/downloaded/csv_file.csv' 
    csvfile = open(f, 'r') 
    reader = csv.DictReader(csvfile) 

    for row in reader: 
     """ 
     Send the row to Kafka 
     """ 
    return 

csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

现在,它真的取决于你将如何得到要下载的文件。在我的情况下,我使用SSHHookGoogleCloudStorageHook从SFTP服务器获取文件,然后将这些文件的名称传递给解析和清理csv文件的任务。我通过SFTP拉低文件,并把它们放入谷歌云存储做到这一点:

""" 
HOOKS: Connections to external systems 
""" 
def sftp_connection(): 
    """ 
    Returns an SFTP connection created using the SSHHook 
    """ 
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection') 
    ssh_client = ssh_hook.get_conn() 
    return ssh_client.open_sftp() 
def gcs_connection(): 
    """ 
    Returns an GCP connection created using the GoogleCloudStorageHook 
    """ 
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection') 

""" 
PYTHON CALLABLES: Called by PythonOperators 
""" 
def get_files(**context): 
    """ 
    Looks at all files on the FTP server and returns a list files. 
    """ 
    sftp_client = sftp_connection() 
    all_files = sftp_client.listdir('/path/to/files/') 
    files = [] 

    for f in all_files: 
     files.append(f) 

    return files 

def save_files(**context): 
    """ 
    Looks to see if a file already exists in GCS. If not, the file is downloaed 
    from SFTP server and uploaded to GCS. A list of 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='get_files') 

    sftp_client = sftp_connection() 
    gcs = gcs_connection() 
    new_files = [] 
    new_outcomes_files = [] 
    new_si_files = [] 

    new_files = process_sftp_files(files, gcs, sftp_client) 

    return new_files 

def csv_file_to_kafka(**context): 
    """ 
    Untested sample parse csv files and send to kafka 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='save_files') 
    for f in new_files: 
     csvfile = open(f, 'r') 
     reader = csv.DictReader(csvfile) 

     for row in reader: 
      """ 
      Send the row to Kafka 
      """ 
    return 

get_files = PythonOperator(
    task_id='get_files', 
    python_callable=get_files, 
    dag=dag 
) 
save_files = PythonOperator(
    task_id='save_files', 
    python_callable=save_files, 
    dag=dag 
) 
csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

我知道我可以做到这一切在一个大蟒蛇可调用的,这就是我现在如何重构代码,以便在可调用。因此,它轮询SFTP服务器,提取最新的文件,并根据我的规则解析它们,所有这些都在一个Python函数中。我听说使用XCom并不理想,据推测,Airflow任务不应该彼此交流太多。

根据您的使用情况,您甚至可能想要探索Apache Nifi之类的东西,我现在也正在研究它。