2017-04-12 115 views
3

嗨我想运行一个长的sparkjob往往因StackoverflowError失败。作业读取parquetfile并在foreach循环中创建一个rdd。在做了一些研究之后,我认为为每个rdd创建一个检查点将帮助我解决我的记忆问题。 (我尝试了不同的内存,开销内存,并行操作,重新分区,并找到了工作中最有效的设置,但是有时它仍然会因为我们集群上的负载而失败。)rdd.checkpoint跳过火花作业

现在来我真正的问题。我试图创建检查点,首先读取创建RDD的parquet,然后缓存它,运行检查点函数,然后首先调用操作以使检查点发生。在我指定的路径中没有创建检查点,并且它表示跳过舞台的YARN UI。谁能帮我理解这个问题:)

ctx.getSparkContext().setCheckpointDir("/tmp/checkpoints"); 
    public static void writeHhidToCouchbase(DataFrameContext ctx, List<String> filePathsStrings) { 
    filePathsStrings 
     .forEach(filePath -> { 
      JavaPairRDD<String, String> rdd = 
       UidHhidPerDay.getParquetFromPath(ctx, filePath); 
      rdd.cache(); 
      rdd.checkpoint(); 
      rdd.first(); 
      rdd.foreachPartition(p -> { 
      CrumbsClient client = getClient(); 
      p.forEachRemaining(uids -> { 
       Crumbs crumbs = client.getAsync(uids._1) 
        .timeout(10, TimeUnit.SECONDS) 
        .toBlocking() 
        .first(); 
       String hHid = uids._2; 
       if (hHid != null) { 
       crumbs.getOrCreateSingletonCrumb(HouseholdCrumb.class).setHouseholdId(hHid); 
       client.putSync(crumbs); 
       } 
      }); 
      client.shutdown(); 
      }); 
     }); 
} 

检查点是在第一次迭代创建一次,但从来没有一次。 KR

+1

不确定是否相关,但它看起来像调用'rdd.checkpoint()'是多余的,rdd没有父谱系截断。因此,这对内存使用不起作用。 – ImDarrenG

+0

@ImDarrenG谢谢!你能告诉我如何在代码中完成吗? – Smastik

+0

你可以删除对checkpoint()的调用,它不会帮你。 – ImDarrenG

回答

1

我的错误实际上是创建分区。我上面提到的“第一个”分区是一个内部有分区的目录。由于目录名称像8f987639-d5c8-46b8-a1e0-37081f9f8e00我变得困惑。然而,从@ImDarrenG看到的血统评论给了我更多的见解。我从第一个缓存和检查点创建了一个新的重新分区的RDD。这使得应用程序更稳定,没有失败。

JavaPairRDD<String, String> rdd = 
      UidHhidPerDay.getParquetFromPath(ctx, filePath); 
     rdd.cache(); 
     rdd.checkpoint(); 
     rdd.first(); 
     JavaPairRDD<String, String> rddToCompute = rdd.repartition(72); 
     rddToCompute.foreachPartition... 
+0

很高兴你发现了这个问题。如果没有rdd.cache(),我会感兴趣的看看它是否稳定。 rdd.checkpoint(); rdd.first();但如果它没有损坏 - 不要修复它! – ImDarrenG

+1

谢谢,它不稳定我总是会得到x nr的失败任务foreachpartition :),但现在我没有得到:) – Smastik