该代码的目的是基于在RDD上运行的“myFunc”方法加载计算一些逻辑以获得并行化优势。pyspark中的传递函数
下列行: df_rdd = ParallelBuild()运行()地图(拉姆达线:线)。.persist() R = df_rdd.map(ParallelBuild()myFunc的)
给我出口0阅读谷歌认为,Spark是懒惰的评估,使一些操作会触发该效果,我补充一下:
r.count()给我:
TypeError: 'JavaPackage' object is not callable
注意的事情是: R = df_rdd。地图(Parall elBuild()。myFunc)
给“pipelinedrdd”不确定那是什么,但看起来像一些转变?数据= [(1,'a'),(1,'b'),(1,'c'),(2,'d'), )(3,'r'),(4,'a'),(2,'t'),(3,'y'),(1,'f')] df = sqlContext.createDataFrame ,schema = ['uid','address_uid'])
直接在我的主要功能,然后事情工作得很好。但显然我松开了我的代码的模块化部分。
代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
import csv, io, StringIO
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql.functions import asc, desc
sc = SparkContext("local", "Summary Report")
sqlContext = SQLContext(sc)
class ParallelBuild(object):
def myFunc(self, s):
l = s.split(',')
print l[0], l[1]
return l[0]
def list_to_csv_str(x):
output = StringIO.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip()
def run(self):
data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')]
df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid'])
return df
if __name__ == "__main__":
df_rdd = ParallelBuild().run().map(lambda line: line).persist()
r = df_rdd.map(ParallelBuild().myFunc)
r.count()