1
A
回答
1
可能最好使用PythonOperator
来逐行处理文件。我有一个用于轮询和SFTP服务器文件的用例,当我找到一些时,我逐行处理它们,并将结果写成JSON。我不喜欢的东西解析日期为YYYY-MM-DD格式等这样的事情可能为你工作:
def csv_file_to_kafka(**context):
f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
现在,它真的取决于你将如何得到要下载的文件。在我的情况下,我使用SSHHook
和GoogleCloudStorageHook
从SFTP服务器获取文件,然后将这些文件的名称传递给解析和清理csv文件的任务。我通过SFTP拉低文件,并把它们放入谷歌云存储做到这一点:
"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')
"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []
for f in all_files:
files.append(f)
return files
def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')
sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []
new_files = process_sftp_files(files, gcs, sftp_client)
return new_files
def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
我知道我可以做到这一切在一个大蟒蛇可调用的,这就是我现在如何重构代码,以便在可调用。因此,它轮询SFTP服务器,提取最新的文件,并根据我的规则解析它们,所有这些都在一个Python函数中。我听说使用XCom并不理想,据推测,Airflow任务不应该彼此交流太多。
根据您的使用情况,您甚至可能想要探索Apache Nifi之类的东西,我现在也正在研究它。
相关问题
- 1. 将kafka(kafka-python)转储到txt文件
- 2. 是否可以使用Kafka传输文件?
- 3. 将DBF文件传输到EXCEL到PHP
- 4. 使用按钮将值传输到文本文件中php
- 5. 将日志文件kafka移动到hadoop
- 6. 使用TCP传输文件
- 7. 使用FTP传输文件
- 8. 使用FTP传输文件
- 9. 使用c#传输文件
- 10. 使用python传输文件
- 11. 使用WCF传输文件
- 12. 将大量大文件传输到s3
- 13. 将CSV文件从iPhone传输到MacBook
- 14. 自动将csv文件传输到MySQL
- 15. 将回声传送到输出文件
- 16. 将文件传输到Windows服务
- 17. 将文件传输到Tomcat位置
- 18. 如何将.frm文件传输到表?
- 19. 将文件传输到消息代理
- 20. 将100个Excel文件传输到MySQL
- 21. 如何使用java将文件从url传输到ftp?
- 22. 使用SNMP将文件从代理传输到管理器?
- 23. 如何使用ubuntu将文件传输到安卓android 14.04
- 24. 使用pscp将文件从windows传输到linux包装盒
- 25. 如何使用SSIS包将文件传输到SFTP路径?
- 26. 使用gstreamer将本地mpeg-ts文件流式传输到udp
- 27. 如何使用ssh将文件从Windows传输到Linux?
- 28. 使用Windows机器将.ipa文件传输到iPad中
- 29. 我可以使用Apache NIFI将文件传输到HDFS系统。
- 30. 使用WCF将2-10MB文件传输到非.Net客户端
你是真的将这些文件加载到文件中,还是将它们加入到文件中?气流确实支持配料/微配料,但对于流媒体来说,我的经验表明它不是太好,基本上就像_nano_-batching。我对远程主机上的CSV文件进行了大量轮询,并将它们作为批次拉入BigQuery中。 – Mike
我逐行处理它们并将每行发送到kafka。 – bsd