1
我对Python和Spark的相当新颖,但让我看看我是否可以解释我正在尝试做什么。在Python中对类方法调用mapPartition
我有一堆不同类型的页面,我想处理。我为这些页面的所有常见属性创建了一个基类,然后从基类继承了一个特定于页面的类。这个想法是,火星跑步者将能够通过改变被调用时的页面类型来为所有页面做确切的事情。
亚军
def CreatePage(pageType):
if pageType == "Foo":
return PageFoo(pageType)
elif pageType == "Bar":
return PageBar(pageType)
def Main(pageType):
page = CreatePage(pageType)
pageList_rdd = sc.parallelize(page.GetPageList())
return = pageList_rdd.mapPartitions(lambda pageNumber: CreatePage(pageType).ProcessPage(pageNumber))
print Main("Foo")
PageBaseClass.py
class PageBase(object):
def __init__(self, pageType):
self.pageType = None
self.dbConnection = None
def GetDBConnection(self):
if self.dbConnection == None:
# Set up a db connection so we can share this amongst all nodes.
self.dbConnection = DataUtils.MySQL.GetDBConnection()
return self.dbConnection
def ProcessPage():
raise NotImplementedError()
PageFoo.py
class PageFoo(PageBase, pageType):
def __init__(self, pageType):
self.pageType = pageType
self.dbConnetion = GetDBConnection()
def ProcessPage():
result = self.dbConnection.cursor("SELECT SOMETHING")
# other processing
有很多的,我从简洁省略其他网页的特定功能,但想法是,我想保留如何在页面类中处理该页面的所有逻辑。而且,能够共享像db连接和s3存储桶这样的资源。
我知道我现在拥有它的方式,它为rdd中的每个项目创建一个新的Page对象。有没有办法做到这一点,以便它只创建一个对象?有更好的模式吗?谢谢!
感谢您的建议。非常感激。我忘记了我切换到使用mapPartitions,所以我会更新问题的这一部分。我会检查博格图案。 – zdubu