2016-11-05 47 views
13

Spark专家的一个很好的问题。Spark执行程序上的对象缓存

我正在处理map操作(RDD)中的数据。在映射函数中,我需要查找A类的对象以用于处理RDD中的元素。

由于这将在执行器上执行,并且创建类型为A(将被查找)的元素恰好是一个昂贵的操作,我想要在每个执行器上预加载和缓存这些对象。做这件事的最好方法是什么?

  • 一个想法是广播查找表,但类A是不可序列(在其没有实施控制)。

  • 另一个想法是将它们加载到单例对象中。但是,我想控制装入该查找表的内容(例如,不同的Spark作业中可能有不同的数据)。

理想情况下,我需要指定哪些将在执行人一次装入(包括流的情况下,使查找表停留在批次之间的内存),通过将可在驾驶过程中的参数它的启动,在任何数据被处理之前。

是否有干净优雅的做法,或者它不可能实现?

+0

为什么没有查找表也分布?所以你可以使用DataFrames来连接两组数据?如果总是需要查找数据,那么每次需要运行计算时都需要承受广播数据的费用? – DevZer0

+1

@ DevZer0 _A不是serializable_。 – 2016-11-05 14:07:25

回答

3

这正是broadcast.的目标用例。广播变量只发送一次,并使用种子高效地移动到所有执行程序,并保留在内存/本地磁盘中,直到不再需要它们为止。

序列化在使用其他接口时经常会弹出一个问题。如果你可以强制你使用的对象是可序列化的,那将是最好的解决方案。如果这是不可能的,你的生活会变得更复杂一些。如果无法序列​​化A对象,则必须在执行器上为每项任务创建它们。如果他们存储在文件中的某个地方,这看起来是这样的:

rdd.mapPartitions { it => 
    val lookupTable = loadLookupTable(path) 
    it.map(elem => fn(lookupTable, elem)) 
} 

请注意,如果你采用这种模式,那么你必须每一次任务加载查找表 - 你不能受益于广播变量的跨任务持久性。

编辑:这里是另一个模型,我相信可以让您在每个JVM的任务之间共享查找表。

class BroadcastableLookupTable { 
    @transient val lookupTable: LookupTable[A] = null 

    def get: LookupTable[A] = { 
    if (lookupTable == null) 
     lookupTable = < load lookup table from disk> 
    lookupTable 
    } 
} 

这个类可以广播(没有实质性的传输),并且第一次调用每个JVM时,您将加载查找表并返回它。

+0

不幸的是,这些对象是不可序列化的,所以我们确实需要采用第二种方法,就像你所描述的那样。但是,我们还必须能够跨任务共享查找表。 – DruckerBg

+0

为什么你需要分享任务?你是否正在更新地图操作中的查找表? – Tim

+1

增加了一种可能的方式来做到这一点。 – Tim

3

如果序列化结果不可能,那么如何将查找对象存储在数据库中?这不是最简单的解决方案,但应该可以正常工作。我可以推荐检查例如spark-redis,但我确定有更好的解决方案。

+0

谢谢,这是一个不错的解决方案。一个问题是这些实际上是JVM中的一些对象。 – DruckerBg

+0

我更新了这个问题,包括:“...创建类型A(将被查找)的元素碰巧是一个昂贵的操作...” – DruckerBg

+0

如何将JVM对象存储为字节数组,Redis? –

0

由于A不是可序列化的,因此最简单的解决方案是创建自己的可序列化类型A1,其中包含计算所需的所有A数据。然后在广播中使用新的查找表。