2016-04-29 56 views
13

我试图使用Spark 1.6.1parquet文件写入Amazon S3。我生成的小parquet是一次写入~2GB,所以它不是那么多的数据。我试图证明Spark作为我可以使用的平台。使用Spark在s3a上向s3写入实木复合地址文件非常慢

基本上我正在设置一个star schemadataframes,然后我将这些表写出来实木复合地板。数据来自供应商提供的csv文件,我使用Spark作为ETL平台。我目前在ec2(r3.2xlarge)有3个节点的集群,所以120GB的执行内存和总共16个内核。

输入文件总计大约22GB,现在我正在提取大约2GB的数据。最终,当我开始加载完整数据集时,这将会有很多TB。

这里是我的火花/斯卡拉pseudocode

def loadStage(): Unit = { 
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData") 
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter") 
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false") 
    var sqlCtx = new SQLContext(sc) 


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz") 

    //Setup header table/df 
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1") 
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....." 
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) 
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6))) 
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema) 
    header.registerTempTable("header") 
    sqlCtx.cacheTable("header") 


    //Setup fact table/df 
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2") 
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....." 
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) 
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10))) 
    val df = sqlCtx.createDataFrame(records, factSchema) 
    df.registerTempTable("fact") 

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date") 


    println(results.count()) 



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet") 


    } 

计大约需要2分钟465884512行。到地板的写操作38分钟

我明白​​3210做了洗牌;但确实写的驱动程序....但时间它的服用量是让我觉得我做的事情严重错误。如果没有​​3210,这仍然需要15分钟,该IMO仍然太长,并给我一吨小的parquet文件。我想每天有一个大文件,我将拥有。我的代码也是通过字段值进行分区,并且它也一样慢。我也试图输出这csv,这需要约1小时。

另外,我在提交工作时并没有真正设置跑步时间道具。我的一次作业控制台统计信息是:

  • 活着工人:2个
  • 芯在使用中:16总,16用于
  • 使用的存储:117.5 GB总,107.5 GB二手
  • 应用范围:1跑步,5完成
  • 驱动程序:0运行,0已完成
  • 状态:ALIVE
+1

合并不会拖曳到驱动程序之间,在执行程序之间洗牌,但这与您所看到的问题无关。你在使用EMR吗?如果是这样,请使用s3://而不是s3a://。在Spark 1.6上,您应该使用Direct OutputCommitter,就像@David所说的那样。另一个可能的改进是将parquet.enable.summary-metadata设置为false –

+0

在S3前面使用Alluxio能够加速它吗? –

回答

14

星火defaul在I/O操作期间,ts会导致大量(可能)不必要的开销,特别是在写入S3时。 This article对此进行了更彻底的讨论,但您需要考虑更改2个设置。

  • 使用DirectParquetOutputCommitter。默认情况下,Spark会将所有数据保存到临时文件夹,然后移动这些文件。使用DirectParquetOutputCommitter将直接书面方式向S3输出路径

    • No longer available in Spark 2.0+
      • 正如JIRA客票记载节省时间,目前的解决方案是
        1. 代码切换到使用s3a和Hadoop 2.7.2+;这是更好的全面,获取Hadoop的2.8更好,并且是s3guard
        2. 使用Hadoop的FileOutputCommitter的基础,并设置mapreduce.fileoutputcommitter.algorithm.version 2
  • 关闭架构合并。如果启用了模式合并,驱动程序节点将扫描所有文件以确保一致的模式。这是非常昂贵的,因为它不是分布式操作。确保这是做

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

+2

截止到Spark 2.0 DirectParquetOutputCommitter不再可用。查看[SPARK-10063](https://issues.apache.org/jira/browse/SPARK-10063)获取新解决方案 –

+0

@TalJoffe您是否尝试过他们的解决方案?如果是这样,它是如何工作的?你能回答如何? – David

+0

我确实尝试过,效果很好。我在一个30g文件夹上做了一个小测试,性能几乎相同 –

3

直接输出提交者从火花代码库去关闭;你要在你自己的JAR中编写你自己的/重新生成已删除的代码。如果您这样做,请在工作中关闭猜测,并知道其他故障也可能导致问题,问题在于“无效数据”。 Hadoop 2.8将会添加一些S3A加速专门用于读取S3以外的优化二进制格式(ORC,Parquet);另一方面,详情请参阅HADOOP-11694。有些人正在努力将Amazon Dynamo用于一致的元数据存储,这应该能够在工作结束时进行可靠的O(1)提交。

相关问题