2015-12-02 73 views
0

RDD.saveAsTextFile最后一个阶段非常缓慢。我怀疑记录不是均匀分布在分区和任务上的问题。有什么办法可以强制这个吗?Spark没有将负载均匀分配到任务

Last Task never finishes

public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating, 
                 JavaPairRDD<Integer, Integer> userIdClusterId, 
                 int numPartitions, String outDir){ 
    /* 
    convert the JavaRDD<Rating> to JavaPairRDD<Integer,DmRating> 
    */ 
    JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() { 
     public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception { 
      return new Tuple2<>(dmRating.user(), (DmRating)dmRating); 
     } 
    }); 

    /* 
    join this RDD with userIdClusterID RDD by key 
    */ 
    JavaPairRDD<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating = userIdClusterId.join(userIdDmRating, numPartitions); 

    // extract the clusterId to videoId map 
    JavaPairRDD<Integer, Integer> clusterIdVideoId = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer,DmRating>>, Integer, Integer>() { 
     public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userIdDmRatingClusterId) throws Exception { 
      Integer userId = userIdDmRatingClusterId._1(); 
      Tuple2<Integer, DmRating> dmRatingClusterId = userIdDmRatingClusterId._2(); 
      return new Tuple2<Integer, Integer>(dmRatingClusterId._1(), dmRatingClusterId._2().product()); 
     } 
    }); 
    ////// 
    /// Count the popularity of a video in a cluster 
    JavaPairRDD<String, Integer> clusterIdVideoIdStrInt = clusterIdVideoId.mapToPair(new PairFunction<Tuple2<Integer, Integer>, String, Integer>() { 
     @Override 
     public Tuple2<String, Integer> call(Tuple2<Integer, Integer> videoIdClusterId) throws Exception { 
      return new Tuple2<>(String.format("%d:%d", videoIdClusterId._1(), videoIdClusterId._2()), 1); 
     } 
    }); 
    JavaPairRDD<String, Integer> clusterIdVideoIdStrCount = clusterIdVideoIdStrInt.reduceByKey(new Function2<Integer, Integer, Integer>() { 
     @Override 
     public Integer call(Integer v1, Integer v2) throws Exception { 
      return v1+v2; 
     } 
    }); 
    /// 

    JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterId_T_videoIdCount = clusterIdVideoIdStrCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<Integer, Integer>>() { 
     @Override 
     public Tuple2<Integer, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> clusterIdVideoIdStrCount) throws Exception { 
      String[] splits = clusterIdVideoIdStrCount._1().split(":"); 
      try{ 
       if(splits.length==2){ 
        int clusterId = Integer.parseInt(splits[0]); 
        int videoId = Integer.parseInt(splits[1]); 
        return new Tuple2<>(clusterId, new Tuple2<>(videoId, clusterIdVideoIdStrCount._2())); 
       }else{ 
        //Should never occur 
        LOGGER.error("Could not split {} into two with : as the separator!", clusterIdVideoIdStrCount._1()); 
       } 
      }catch (NumberFormatException ex){ 
       LOGGER.error(ex.getMessage()); 
      } 
      return new Tuple2<>(-1, new Tuple2<>(-1,-1)); 
     } 
    }); 

    JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> clusterIdVideoIdGrouped = clusterId_T_videoIdCount.groupByKey(); 

    JavaPairRDD<Integer, DmRating> clusterIdDmRating = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer, DmRating>>, Integer, DmRating>() { 
     @Override 
     public Tuple2<Integer, DmRating> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating) throws Exception { 
      return userId_T_clusterIdDmRating._2(); 
     } 
    }); 

    JavaPairRDD<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> clusterId_T_DmRatingVideoIds = clusterIdDmRating.join(clusterIdVideoIdGrouped, numPartitions); 

    JavaPairRDD<Integer, String> userIdStringRDD = clusterId_T_DmRatingVideoIds.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>>, Integer, String>() { 
     @Override 
     public Tuple2<Integer, String> call(Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> v1) throws Exception { 
      int clusterId = v1._1(); 
      Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>> tuple = v1._2(); 
      DmRating rating = tuple._1(); 
      Iterable<Tuple2<Integer, Integer>> videosCounts= tuple._2(); 
      StringBuilder recosStr = new StringBuilder(); 
      boolean appendComa = false; 
      for(Tuple2<Integer, Integer> videoCount : videosCounts){ 
       if(appendComa) recosStr.append(","); 
       recosStr.append("{"); 
       recosStr.append("\"video_id\":"); 
       recosStr.append(videoCount._1()); 
       recosStr.append(","); 
       recosStr.append("\"count\":"); 
       recosStr.append(videoCount._2()); 
       recosStr.append("}"); 
       appendComa = true; 
      } 
      String val = String.format("{\"user_id\":\"%s\",\"v1st\":\"%s\",\"redis_uid\":%s,\"cluster_id\": %d,\"recommendations\":[ %s ]}", rating.dmUserId, rating.dmV1stStr, rating.user(), clusterId, recosStr); 
      return new Tuple2<Integer, String>(rating.user(), val); 
     } 
    }); 
    JavaPairRDD<Integer, Iterable<String>> groupedRdd = userIdStringRDD.groupByKey(numPartitions); 
    JavaRDD<String> jsonStringRdd = groupedRdd.map(new Function<Tuple2<Integer, Iterable<String>>, String>() { 
     @Override 
     public String call(Tuple2<Integer, Iterable<String>> v1) throws Exception { 
      for(String str : v1._2()){ 
       return str; 
      } 
      LOGGER.error("Could not fetch a string from iterable so returning empty"); 
      return ""; 
     } 
    }); 

    //LOGGER.info("Number of items in RDD: {}", jsonStringRDD.count()); 
    //return jsonStringRDD.persist(StorageLevel.MEMORY_ONLY_SER_2()); 
    LOGGER.info("Repartitioning the data into {}", numPartitions); 
    jsonStringRdd.cache().saveAsTextFile(outDir); 
    return jsonStringRdd; 
} 

群集大小: 1.主:16 CPU,32GB 2.工人4:32CPU,102GB,4X375GB SSD驱动器

我改变了代码中使用DataFrames代替。还是同样的问题

public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD, 
               int numPartitions, 
               JavaSparkContext javaSparkContext, 
               String outdir){ 

    JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() { 
     public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception { 
      //Integer userId = v1._1(); 
      Tuple2<DmRating, Integer> values = v1._2(); 
      DmRating rating = values._1(); 
      Integer clusterId = values._2(); 
      rating.setClusterId(clusterId); 
      rating.setVideoId(rating.product()); 
      rating.setV1stOrUserId((rating.userId== null || rating.userId.isEmpty())? rating.v1stId : rating.userId); 
      rating.setRedisId(rating.user()); 
      return rating; 
      //return String.format("{\"clusterId\": %s,\"userId\": %s, \"userId\":\"%s\", \"videoId\": %s}", clusterId, userId, rating.userId, rating.product()); 
     } 
    }); 
    SQLContext sqlContext = new SQLContext(javaSparkContext); 
    DataFrame dmRatingDF = sqlContext.createDataFrame(dmRatingJavaRDD, DmRating.class); 
    dmRatingDF.registerTempTable("dmrating"); 
    DataFrame clusterIdVideoIdDF = sqlContext.sql("SELECT clusterId, videoId FROM dmrating").cache(); 
    DataFrame rolledupClusterIdVideoIdDF = clusterIdVideoIdDF.rollup("clusterId","videoId").count().cache(); 
    DataFrame clusterIdUserIdDF = sqlContext.sql("SELECT clusterId, userId, redisId, v1stId FROM dmrating").distinct().cache(); 
    JavaRDD<Row> rolledUpRDD = rolledupClusterIdVideoIdDF.toJavaRDD(); 
    JavaRDD<Row> filteredRolledUpRDD = rolledUpRDD.filter(new Function<Row, Boolean>() { 
     @Override 
     public Boolean call(Row v1) throws Exception { 
      //make sure the size and values of the properties are correct 
      return !(v1.size()!=3 || v1.isNullAt(0) || v1.isNullAt(1) || v1.isNullAt(2)); 
     } 
    }); 

    JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterIdVideoIdCount = filteredRolledUpRDD.mapToPair(new PairFunction<Row, Integer, Tuple2<Integer, Integer>>() { 
     @Override 
     public Tuple2<Integer, Tuple2<Integer, Integer>> call(Row row) throws Exception { 
      Tuple2<Integer, Integer> videoIdCount = new Tuple2<Integer, Integer>(row.getInt(1), Long.valueOf(row.getLong(2)).intValue()); 
      return new Tuple2<Integer, Tuple2<Integer, Integer>>(row.getInt(0),videoIdCount); 
     } 
    }).cache(); 
    JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> groupedPair = clusterIdVideoIdCount.groupByKey(numPartitions).cache(); 
    JavaRDD<ClusterIdVideos> groupedFlat = groupedPair.map(new Function<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, ClusterIdVideos>() { 
     @Override 
     public ClusterIdVideos call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> v1) throws Exception { 
      ClusterIdVideos row = new ClusterIdVideos(); 
      Iterable<Tuple2<Integer, Integer>> videosCounts= v1._2(); 
      StringBuilder recosStr = new StringBuilder(); 
      recosStr.append("["); 
      boolean appendComa = false; 
      for(Tuple2<Integer, Integer> videoCount : videosCounts){ 
       if(appendComa) recosStr.append(","); 
       recosStr.append("{"); 
       recosStr.append("\"video_id\":"); 
       recosStr.append(videoCount._1()); 
       recosStr.append(","); 
       recosStr.append("\"count\":"); 
       recosStr.append(videoCount._2()); 
       recosStr.append("}"); 
       appendComa = true; 
      } 
      recosStr.append("]"); 
      row.setClusterId(v1._1()); 
      row.setVideos(recosStr.toString()); 
      return row; 
     } 
    }).cache(); 

    DataFrame groupedClusterId = sqlContext.createDataFrame(groupedFlat, ClusterIdVideos.class); 
    DataFrame recosDf = clusterIdUserIdDF.join(groupedClusterId, "clusterId"); 
    recosDf.write().parquet(outdir); 
} 

enter image description here

+0

使用“.repartition(numPartitions:Int)”。它会增加并行性,但这可能无法解决你的问题...你可以发布代码。 – Sumit

+0

试图..不影响时间或分区 – Ram

+0

我们可能能够帮助但需要查看代码。所以请张贴代码。 – Sumit

回答

0

好发现的问题。罪魁祸首是groupBy和加入操作。 Spark的网站上的文档说明

在Spark中,数据通常不是分布在分区上,而是在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,要组织执行单个reduceByKey reduce任务的所有数据,Spark需要执行全部操作。它必须从所有分区中读取以找到所有键的所有值,然后将各分区中的值汇总以计算每个键的最终结果 - 这称为混洗。

要优化任何join/groupByKey操作,目标应该是减少随机播放。我发现this deck对诊断问题非常有帮助。尤其是幻灯片12

我知道群集ID数据非常小,每次运行100个群集,所以我为较小的表创建了一个广播变量,并将其广播给所有执行者并将该变量用于连接。这很有效,将计算时间从2小时减少到10分钟。

//convert json string to DF 
    DataFrame groupedClusterId = sqlContext.read().json(groupedFlat.rdd()); 
    Broadcast<DataFrame> broadcastDataFrame= javaSparkContext.broadcast(groupedClusterId); 

    DataFrame recosDf = clusterIdUserIdDF.join(broadcastDataFrame.value(),"clusterId"); 
    recosDf.write().parquet(outdir);