2016-06-12 66 views
4

我已经使用sc.setCheckpointDir方法设置了检查点目录。RDD.checkpoint()不在检查点目录中存储任何数据

/checkpointDirectory/ 

我然后创建了一个RDD的检查点:rdd.checkpoint(),并在目录中,我现在看到一个新的目录代表新的关卡,在字母随机字符串的形式。在那个目录里面什么也没有。

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty] 

然后做了几个转换之后,我跑rdd.checkpoint()一遍,仍然没有什么,最近创建的目录

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty] 

我使用checkpoint()错了吗?我应该在目录中看到什么才能知道它的正常工作?

回答

4

checkpoint,与Spark中的许多其他操作一样,是懒惰的。数据实际上是检查点,当且仅当给定的RDD被实现时。您看到的空目录是特定于应用程序的检查点目录。

如果您希望检查点发生,您必须触发一个操作来评估相应的RDD。举例(本地模式):

import glob 
import os 
from urllib.parse import urlparse 

sc.setCheckpointDir("/tmp/checkpoints/") 
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*") 

rdd = sc.range(1000, 10) 
plus_one = rdd.map(lambda x: x + 1) 
plus_one.cache() 
plus_one.checkpoint() # No checkpoint dir here yet 

[os.path.split(x)[-1] for x in glob.glob(ch_dir)] 
## [] 
plus_one.isCheckpointed() 
## False 

# After count is executed you'll see rdd specific checkpoint dir 
plus_one.count() 
[os.path.split(x)[-1] for x in glob.glob(ch_dir)] 
## ['rdd-1'] 
plus_one.isCheckpointed() 
## True 

您还可以分析调试字符串之前:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated] 
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated] 

和行动后:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated] 
## |  CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B 
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated] 

正如你可以看到RDD将被计算之前从零开始,但在count之后,您将获得ReliableCheckpointRDD

0

检查点将定期完成(即检查点持续时间)。您需要将检查点持续时间告诉给您的spark上下文。