2017-04-15 234 views
0

所以我一直在尝试几天来在Spark的map函数里面运行ML算法。我贴一个更具体的question但引用星火的ML算法使我有以下错误:在Spark中运行ML算法里面的map函数

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized? 

很明显,我不能引用SparkContextapply_classifier函数内。 我的代码是类似于在前面的问题,我问建议,但至今还没有找到一个解决我所期待的:

def apply_classifier(clf): 
    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3) 
    if clf == 0: 
     clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3) 
    elif clf == 1: 
     clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5) 

classifiers = [0, 1] 

sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect() 

我一直在使用flatMap代替map尝试,但我得到NoneType object is not iterable

我还想在apply_classifier函数中传递一个广播数据集(这是一个DataFrame)作为参数。 最后,我有可能做我想做的事情吗?有什么选择?

回答

5

is it possible to do what I am trying to do?

它不是。 Apache Spark不支持任何形式的嵌套,分布式操作只能由驱动程序初始化。这包括访问分布式数据结构,如Spark DataFrame

What are the alternatives?

这取决于许多因素,如数据的大小,可用资源的数量和算法的选择。一般而言,您有三种选择:

  • 仅使用Spark作为任务管理工具来训练本地非分布式模型。看起来你已经在某种程度上探索了这条道路。要进一步实施此方法,您可以检查spark-sklearn

    一般来说,这种方法在数据相对较小时特别有用。它的优点是多个工作之间没有竞争。

  • 使用标准的多线程工具从一个上下文中提交多个独立的作业。您可以使用例如threadingjoblib

    虽然这种方法是可能的,但我不会在实践中推荐它。并非所有Spark组件都是线程安全的,并且必须非常小心才能避免意外行为。它也使您很少控制资源分配。

  • 参数化您的Spark应用程序并使用外部管线管理器(Apache Airflow,Luigi,Toil)提交您的工作。虽然这种方法有一些缺点(它需要将数据保存到持久性存储),但它也是最普遍和最健壮的,并且对资源分配提供了很多控制。

+0

谢谢你的回答。我会检查这些外部管道经理! –