我已经运行下面的代码PySpark:为什么在使用saveAsTextFile时,在Google Dataproc中运行的Spark将临时文件存储在外部存储(GCS)而不是本地磁盘或HDFS上?
from pyspark import SparkContext
sc = SparkContext()
data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.saveAsTextFile(
'gs://bucket-name/output_blob_path',
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)
工作顺利完成。但是,在作业执行期间,Spark在以下路径gs://bucket-name/output_blob_path/_temporary/0/
中创建了许多临时Blob。我意识到,在这段时间内,除去所有这些临时blob占用了一半的作业执行时间,并且在此期间CPU利用率为1%(巨大的资源浪费)。
有没有办法将临时文件存储在本地驱动器(或HDFS)而不是GCP?我仍然希望将最终结果(排序后的数据集)保存到GCP。
我们使用Dataproc Spark集群(VM类型16核,60GM)和10个工作节点。输入数据量为10TB。
感谢您的解释。自从我们对从BigQuery导出到GCS的数据进行排序后,我感到有点惊讶。我的假设是BiqQuery导出功能已经优化了分区数量(在GCS上存储数据集的最佳文件数量)。 – user2548047
根据所应用的RDD操作的种类,转换后的分区数量可能与输入分区数量不同,并且在这种情况下,FileInputFormat将默认将输入文件分割成更小的分区,无论如何独立于输入文件的数量。你可以使用'--properties spark.hadoop.fs.gs.block.size = 536870912'来调整它,例如增加到512MB,而不是默认的64MB。 –
您可能还想在集群部署时默认进行调整。如果您的工作通常在10TB的范围内,'gcloud数据集群创建my-cluster --properties core:fs.gs.block.size = 536870912'将是合理的。如果你的工作只有10GB,那就太高了。在大多数情况下,瞄准超过1000个,小于50000个分区是很好的选择,但即使对于小型工作,通常也不希望达到小于64MB的小块。 –