2017-04-04 100 views
0

我试图访问包含在一个PipelineRDD 这里的价值观是什么,我开始:pySpark如何访问值的元组的(关键,元组)RDD(蟒蛇)

1. RDD =(密钥,编码,数值)

data = [(11720, (u'I50800', 0.08229813664596274)), (11720, (u'I50801', 0.03076923076923077))] 

*强调文字* 2。我需要它组由所述第一值和把它转化为(密钥,元组),其中元组=(代码,值)

testFeatures = lab_FeatureTuples = labEvents.select( 'ITEMID', 'SUBJECT_ID', 'NORM_ITEM_CNT') \ .orderBy( '​​SUBJECT_ID', 'ITEMID')\ .rdd.map(拉姆达(ITEMID,SUBJECT_ID,NORM_ITEM_CNT):(SUBJECT_ID,(ITEMID,NORM_ITEM_CNT)))\ .groupByKey()

testFeatures = [(11720, [(u'I50800', 0.08229813664596274)), (u'I50801', 0.03076923076923077)])] 

关于tuple =(code,value),我想得到以下内容:

Cre吃了斯帕塞夫克托出来的,所以我可以用它为SVM模型

result.take(1)

+1

请重新格式化您的代码 –

回答

0

下面是做到这一点的一种方法:

import pyspark 
import pyspark.sql.functions as sf 
import pyspark.sql.types as sparktypes 
sc = pyspark.SparkContext() 
sqlc = pyspark.SQLContext(sc) 

data = [(11720, (u'I50800', 0.08229813664596274)), 
     (11720, (u'I50801', 0.03076923076923077))] 
rdd = sc.parallelize(data) 

df = sqlc.createDataFrame(rdd, ['idx', 'tuple']) 
df.show() 

给人,

+-----+--------------------+ 
| idx|    tuple| 
+-----+--------------------+ 
|11720|[I50800,0.0822981...| 
|11720|[I50801,0.0307692...| 
+-----+--------------------+ 

现在定义pyspark用户定义的功效:

extract_tuple_0 = sf.udf(lambda x: x[0], returnType=sparktypes.StringType()) 
extract_tuple_1 = sf.udf(lambda x: x[1], returnType=sparktypes.FloatType()) 
df = df.withColumn('tup0', extract_tuple_0(sf.col('tuple'))) 

df = df.withColumn('tup1', extract_tuple_1(sf.col('tuple'))) 
df.show() 

给出:

+-----+--------------------+----------+------+ 
| idx|    tuple|  tup1| tup0| 
+-----+--------------------+----------+------+ 
|11720|[I50800,0.0822981...|0.08229814|I50800| 
|11720|[I50801,0.0307692...|0.03076923|I50801| 
+-----+--------------------+----------+------+