2016-05-31 143 views
1

该代码的目的是基于在RDD上运行的“myFunc”方法加载计算一些逻辑以获得并行化优势。pyspark中的传递函数

下列行: df_rdd = ParallelBuild()运行()地图(拉姆达线:线)。.persist() R = df_rdd.map(ParallelBuild()myFunc的)

给我出口0阅读谷歌认为,Spark是懒惰的评估,使一些操作会触发该效果,我补充一下:

r.count()给我:

TypeError: 'JavaPackage' object is not callable 

注意的事情是: R = df_rdd。地图(Parall elBuild()。myFunc)

给“pipelinedrdd”不确定那是什么,但看起来像一些转变?数据= [(1,'a'),(1,'b'),(1,'c'),(2,'d'), )(3,'r'),(4,'a'),(2,'t'),(3,'y'),(1,'f')] df = sqlContext.createDataFrame ,schema = ['uid','address_uid'])

直接在我的主要功能,然后事情工作得很好。但显然我松开了我的代码的模块化部分。

代码:

from pyspark import SparkContext 
from pyspark.sql import SQLContext, HiveContext 
import csv, io, StringIO 
from pyspark.sql.functions import * 
from pyspark.sql import Row 
from pyspark.sql import * 
from pyspark.sql import functions as F 
from pyspark.sql.functions import asc, desc 
sc = SparkContext("local", "Summary Report") 
sqlContext = SQLContext(sc) 


class ParallelBuild(object): 

def myFunc(self, s): 
    l = s.split(',') 
    print l[0], l[1] 
    return l[0] 


def list_to_csv_str(x): 
    output = StringIO.StringIO("") 
    csv.writer(output).writerow(x) 
    return output.getvalue().strip() 

def run(self): 
    data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')] 
    df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid']) 
    return df 


if __name__ == "__main__": 

    df_rdd = ParallelBuild().run().map(lambda line: line).persist() 
    r = df_rdd.map(ParallelBuild().myFunc) 
    r.count() 

回答

0

好了,你的主要问题是“为什么没有打印任何东西”答案有两部分。

  1. 在分布式计算中你不能真的print。所以你的功能myFunc不会打印任何东西给驱动程序。其原因相当复杂,所以我会指导您进入this page,以获取更多关于为什么打印在Spark中无法正常工作的信息。

然而,呼吁r.count()应该打印出来9。为什么这不起作用?

  1. 您的功能myFunc没有多大意义。当你在r = df_rdd.map(ParallelBuild().myFunc)中打电话时,你想通过df_rdd,我想。但这已经是一个DataFrame。此DataFrame的每一行都是类型,如果您致电df_rdd.first(),您将获得Row(uid=1, address_uid=u'a')。你在做什么myFunc是试图使用split,但split是用于字符串对象,并且你有对象。我不知道为什么这不是抛出一个错误,但你根本不能在对象上调用split。考虑更多沿着r = df_rdd.map(lambda x: x[0])的路线。

因此,我认为r.count()不起作用,因为当您拨打myFunc时,有些事情变得混乱。


旁注:

df_rdd = ParallelBuild().run().map(lambda line: line).persist()。运行.map(lambda line: line)不会执行任何操作。您没有对line进行任何更改,因此请勿运行map作业。代码为df_rdd = ParallelBuild().run().persist()