我试图使用Spark 1.6.1
将parquet
文件写入Amazon S3
。我生成的小parquet
是一次写入~2GB
,所以它不是那么多的数据。我试图证明Spark
作为我可以使用的平台。使用Spark在s3a上向s3写入实木复合地址文件非常慢
基本上我正在设置一个star schema
与dataframes
,然后我将这些表写出来实木复合地板。数据来自供应商提供的csv文件,我使用Spark作为ETL
平台。我目前在ec2(r3.2xlarge)
有3个节点的集群,所以120GB
的执行内存和总共16个内核。
输入文件总计大约22GB,现在我正在提取大约2GB的数据。最终,当我开始加载完整数据集时,这将会有很多TB。
这里是我的火花/斯卡拉pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6)))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
计大约需要2分钟465884512行。到地板的写操作38分钟
我明白3210做了洗牌;但确实写的驱动程序....但时间它的服用量是让我觉得我做的事情严重错误。如果没有3210,这仍然需要15分钟,该IMO仍然太长,并给我一吨小的parquet
文件。我想每天有一个大文件,我将拥有。我的代码也是通过字段值进行分区,并且它也一样慢。我也试图输出这csv
,这需要约1小时。
另外,我在提交工作时并没有真正设置跑步时间道具。我的一次作业控制台统计信息是:
- 活着工人:2个
- 芯在使用中:16总,16用于
- 使用的存储:117.5 GB总,107.5 GB二手
- 应用范围:1跑步,5完成
- 驱动程序:0运行,0已完成
- 状态:ALIVE
合并不会拖曳到驱动程序之间,在执行程序之间洗牌,但这与您所看到的问题无关。你在使用EMR吗?如果是这样,请使用s3://而不是s3a://。在Spark 1.6上,您应该使用Direct OutputCommitter,就像@David所说的那样。另一个可能的改进是将parquet.enable.summary-metadata设置为false –
在S3前面使用Alluxio能够加速它吗? –