2016-12-28 65 views
1

我正在编写一个PySpark作业,但我遇到了一些性能问题。 基本上,它所做的只是从卡夫卡读取事件并记录所做的转换。 问题是,转换是基于对象的函数进行计算的,并且该对象相当隆起,因为它包含一个Graph和一个自动更新的内部缓存。 所以,当我写了下面的一段代码:PySpark流媒体作业 - 避免对象序列化

analyzer = ShortTextAnalyzer(root_dir) 
logger.info("Start analyzing the documents from kafka") 
ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))) 

它系列化我analyzer这需要,因为图的大量时间,并且当它被复制到执行,缓存是仅适用于特定的相关RDD。

如果这个工作是用Scala编写的,我可以编写一个对象,它存在于每个执行器中,然后我的对象不必每次都被序列化。

有没有办法在Python中做到这一点?为每个执行器创建一次对象,然后避免序列化过程?

感谢提前:)

UPDATE: 我读过后How to run a function on all Spark workers before processing data in PySpark?但答案没有谈论共享文件或广播的变量。 我的对象不能被广播,因为他不是只读的。它不断更新它的内部缓存,这就是为什么我需要每个执行程序都有一个对象(以避免序列化)。

回答

0

我最终做了什么,避免了我的对象被序列化,把我的类变成一个静态类 - 只有类变量和类方法。这样每个执行器都会导入一次这个类(使用它的相关变量),不需要序列化。