2017-08-31 196 views
1

因此,我有一个Apache Spark流,每20分钟一天一小时地写入S3 parquet文件分区。
看来,每个批处理在写入之前,都会在此表(/根文件夹)名称的所有文件夹上执行“ls”和“head”。Spark写入文件并追加到s3 - 成本问题

由于我们有多个天X 24小时X个不同的表,这会导致总体上相对较高的S3成本。

请注意我们的模式正在动态改变。

所以我的问题是:

  1. 它是正确的,因为在写递归读取所有实木复合地板的头上?

  2. 为什么流不会缓存这些信息/是否可以缓存它?

  3. 你能建议最佳实践吗?

//编写代码:

withPartition.write() 
       .format(format) 
       .mode(SaveMode.Append) 
       .partitionBy("day","hour") 
       .save(path); 

看来,这个问题涉及到:

https://issues.apache.org/jira/browse/SPARK-20049

Spark partitionBy much slower than without it

回答

0

我发现火花分区是由这个问题的原因:

Spark partitionBy much slower than without it

所以我实现它,如下所示,它解决了这一问题,而且它提高了性能自动:

withPartition = withPartition.persist(StorageLevel.MEMORY_AND_DISK()); 
    Dataset<DayAndHour> daysAndHours = withPartition.map(mapToDayHour(), Encoders.bean(DayAndHour.class)).distinct(); 

    DayAndHour[] collect = (DayAndHour[])daysAndHours.collect(); 
    Arrays.sort(collect); 
    logger.info("found " + collect.length +" different days and hours: " 
      + Arrays.stream(collect).map(DayAndHour::toString).collect(Collectors.joining(",")) ); 
    long time = System.currentTimeMillis(); 
    for(DayAndHour dayAndHour : collect){ 
     int day = dayAndHour.getDay(); 
     int hour = dayAndHour.getHour(); 
     logger.info("Start filter on " + dayAndHour); 
     Dataset<Row> filtered = withPartition.filter(filterDayAndHour(day, hour)) 
       .drop("day", hour"); 

      String newPath = path + "/" 
        + "day" +"=" +day +"/" 
        + "hour" +"=" + hour; 

      long specificPathCount = filtered.count(); 
      long timeStart = System.currentTimeMillis(); 
      logger.info("writing " + specificPathCount+ " event to " + newPath ); 

      filtered.write() 
        .format(format) 
        .mode(SaveMode.Append) 
        .save(newPath); 

      logger.info("Finish writing partition of " + dayAndHour+ " to "+ newPath+ ". Wrote [" + specificPathCount +"] events in " + TimeUtils.tookMinuteSecondsAndMillis(timeStart, System.currentTimeMillis())); 
} 
    logger.info("Finish writing " + path+ ". Wrote [" + cnt +"] events in " + MinuteTimeUtils.tookMinuteSecondsAndMillis(time, System.currentTimeMillis())); 
    withPartition.unpersist(); 

private static MapFunction<Row, DayAndHour> mapToDayHour() { 
    return new MapFunction<Row, DayAndHour>() { 
     @Override 
     public DayAndHour call(Row value) throws Exception { 
      int day = value.getAs("day"); 
      int hour = value.getAs(hour"); 
      DayAndHour dayAndHour = new DayAndHour(); 
      dayAndHour.setDay(day); 
      dayAndHour.setHour(hour); 
      return dayAndHour; 
     } 
    }; 
} 

private static FilterFunction<Row> filterDayAndHour(int day, int hour) { 
    return new FilterFunction<Row>() { 
     @Override 
     public boolean call(Row value) throws Exception { 
      int cDay = value.getAs("day"); 
      int cHour = value.getAs(hour"); 

      return day == cDay && hour == cHour; 
     } 
    }; 
} 

//而另一POJO

public class DayAndHour implements Serializable , Comparable<DayAndHour>{ 

    private int day; 
    private int hour; 

    public int getDay() { 
     return day; 
    } 

    public void setDay(int day) { 
     this.day = day; 
    } 

    public int getHour() { 
     return hour; 
    } 

    public void setHour(int hour) { 
     this.hour = hour; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (o == null || getClass() != o.getClass()) return false; 

     DayAndHour that = (DayAndHour) o; 

     if (day != that.day) return false; 
     return hour == that.hour; 
    } 

    @Override 
    public int hashCode() { 
     int result = day; 
     result = 31 * result + hour; 
     return result; 
    } 

    @Override 
    public String toString() { 
     return "(" + 
       "day=" + day + 
       ", hour=" + hour + 
       ')'; 
    } 

    @Override 
    public int compareTo(DayAndHour dayAndHour) { 
     return Integer.compare((day * 100) + hour, (dayAndHour.day * 100) + dayAndHour.hour); 
    } 
}