2014-11-21 109 views
5

我想在spark中执行我的数据的geoip查找。为此,我使用MaxMind的geoIP数据库。如何在spark中执行初始化?

我想要做的是在每个分区上初始化一次geoip数据库对象,然后用它来查找与IP地址相关的城市。

spark对每个节点都有一个初始化阶段,还是应该检查一个实例变量是否未定义?如果是,请在继续之前对它进行初始化?例如。类似的信息(这是蟒蛇,但我希望有一个解决方案阶):

class IPLookup(object): 
    database = None 

    def getCity(self, ip): 
     if not database: 
     self.database = self.initialise(geoipPath) 
    ... 

当然,这样做需要的火花将连载整个对象,该文档警告反对的东西。

回答

1

这似乎是广播变量的一个很好的用法。你有没有看过该功能的文档,如果你有它不能满足你的要求?

+1

我试过使用广播变量。但它没有奏效。可能是因为com.maxmind.geoip.LookupService不可序列化。我尝试使用SparContext.addFile方法,而且工作正常。添加文件GeoIPCity.dat和GeoIPASNum.dat – 2015-03-10 18:46:16

5

在火花,每分区中的操作可以是使用做:

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false) 

此映射器将在元件的一个迭代执行每个分区一次函数f。这个想法是,设置资源(如数据库连接)的成本将通过迭代器中多个元素的使用而抵消。

例子:

val logsRDD = ??? 
logsRDD.mapPartitions{iter => 
    val geoIp = new GeoIPLookupDB(...) 
    // this is local map over the iterator - do not confuse with rdd.map 
    iter.map(elem => (geoIp.resolve(elem.ip),elem)) 
} 
+0

一个很好的解决方案,但在这种情况下,我想在多个操作中重用该对象,所以广播变量对我来说看起来更有用。 – jbrown 2014-11-25 10:14:56

0

由于@bearrito提到的 - 你可以使用加载地理数据库,然后从驱动器播放。 另一个需要考虑的选择是提供可用于查找的外部服务。它可能是内存缓存,如Redis/Memcached/Tacheyon或常规数据存储。