2017-08-25 34 views
2

我有一个相当复杂的Apache PySpark管道,它对(很大的)一组文本文件执行几个转换。我的管道的预期产量是管道的不同阶段。这是最好的方式(即更有效率,但更多波光粼粼的,意义在于:更适合Spark编程模型和风格)来做到这一点?保存我的Apache Spark管道的中间状态

现在,我的代码如下所示:

# initialize the pipeline and perform the first set of transformations. 
ctx = pyspark.SparkContext('local', 'MyPipeline') 
rdd = ctx.textFile(...).map(...).map(...) 

# first checkpoint: the `first_serialization` function serializes 
# the data into properly formatted string. 
rdd..map(first_serialization).saveAsTextFile("ckpt1") 

# here, I have to read again from the previously saved checkpoint 
# using a `first_deserialization` function that deserializes what has 
# been serialized from the `firs_serialization` function. Then performs 
# other transformations. 
rdd = ctx.textFile("ckpt1").map(...).map(...) 

等。我想摆脱序列化方法和多次保存/读取 - 顺便说一下,它会影响效率吗?我承认是的。

任何提示? 在此先感谢。

回答

1

这似乎很简单,因为它是,但我会建议编写中间阶段,同时继续重用现有的RDD(侧栏:使用数据集/数据框而不是RDD获得更多性能)并继续处理,编写随时随地取得中间结果。

当您已经处理了数据(理想情况下甚至是缓存!)以供进一步使用时,无需支付从磁盘/网络读取的处罚。

使用自己的代码示例:

# initialize the pipeline and perform the first set of transformations. 
ctx = pyspark.SparkContext('local', 'MyPipeline') 
rdd = ctx.textFile(...).map(...).map(...) 

# first checkpoint: the `first_serialization` function serializes 
# the data into properly formatted string. 
string_rdd = rdd..map(first_serialization) 
string_rdd.saveAsTextFile("ckpt1") 

# reuse the existing RDD after writing out the intermediate results 
rdd = rdd.map(...).map(...) # rdd here is the same variable we used to create the string_rdd results above. alternatively, you may want to use the string_rdd variable here instead of the original rdd variable. 
+0

请你提高你的答案添加例如一些示例和/或一些参考代码的链接?谢谢。 – petrux

+0

@petrux,我使用你自己的代码提供了一个例子。我强烈建议评估如何使用Spark 2.x(2.2是本文写作的最新版本)数据结构,如Dataset和DataFrame(在python中,只有pyspark sql DataFrame,Dataset不像Scala中那样)。 – Garren

+0

@加伦:非常感谢。所以我只需要保存为文本文件。好。关于火花版本,我使用2.2。但我不知道使用DataFrame是否适合我的任务。无论如何,我会看看,谢谢你的建议。 – petrux