2016-09-19 125 views
1

我对Python和Spark的相当新颖,但让我看看我是否可以解释我正在尝试做什么。在Python中对类方法调用mapPartition

我有一堆不同类型的页面,我想处理。我为这些页面的所有常见属性创建了一个基类,然后从基类继承了一个特定于页面的类。这个想法是,火星跑步者将能够通过改变被调用时的页面类型来为所有页面做确切的事情。

亚军

def CreatePage(pageType): 
    if pageType == "Foo": 
     return PageFoo(pageType) 
    elif pageType == "Bar": 
     return PageBar(pageType) 

def Main(pageType): 
    page = CreatePage(pageType) 
    pageList_rdd = sc.parallelize(page.GetPageList()) 
    return = pageList_rdd.mapPartitions(lambda pageNumber: CreatePage(pageType).ProcessPage(pageNumber)) 

print Main("Foo") 

PageBaseClass.py

class PageBase(object): 
    def __init__(self, pageType): 
     self.pageType = None 
     self.dbConnection = None 

    def GetDBConnection(self): 
     if self.dbConnection == None: 
     # Set up a db connection so we can share this amongst all nodes. 
     self.dbConnection = DataUtils.MySQL.GetDBConnection() 
     return self.dbConnection 

    def ProcessPage(): 
     raise NotImplementedError() 

PageFoo.py

class PageFoo(PageBase, pageType): 
    def __init__(self, pageType): 
     self.pageType = pageType 
     self.dbConnetion = GetDBConnection() 

    def ProcessPage(): 
     result = self.dbConnection.cursor("SELECT SOMETHING") 
     # other processing 

有很多的,我从简洁省略其他网页的特定功能,但想法是,我想保留如何在页面类中处理该页面的所有逻辑。而且,能够共享像db连接和s3存储桶这样的资源。

我知道我现在拥有它的方式,它为rdd中的每个项目创建一个新的Page对象。有没有办法做到这一点,以便它只创建一个对象?有更好的模式吗?谢谢!

回答

0

几点建议:

  • 不要直接创建连接。使用连接池(因为每个执行程序使用单独的进程设置池大小为一个很好),并确保连接在超时时自动关闭)。
  • 使用Borg pattern来存储池并调整您的代码以使用它来检索连接。
  • 您将无法共享节点之间或甚至单个节点内的连接(请参阅有关单独进程的注释)。您可以获得的最佳保证是每个分区的单个连接(或许多分区与解释器重用)。
+0

感谢您的建议。非常感激。我忘记了我切换到使用mapPartitions,所以我会更新问题的这一部分。我会检查博格图案。 – zdubu