2014-10-29 70 views
0

我想统计哪个用户查看哪个类别的频率。我是新手SparkPython。这里是演示数据:火花python减键

dataSource = sc.parallelize([("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")]) 

我减少了关键用户和收集所有类别。然后我分裂后数:

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y) 
catSplitted = dataReduced.map(lambda (user,values) : (values.split(","),user)) 

每个用户的输出格式是这样的 - >([CAT1,CAT1,CAT2,catn],用户)

可能有人请告诉我怎么算用SparkPython或者你有不同的方法来解决这个问题?

回答

0

纯Python:

ds = [('user1',['film','film','books']), ('user2',['film','books','books'])] 
ds1 = map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))),ds) 
print ds1 

回报:

[('user1', (('film', 2), ('books', 1))), ('user2', (('film', 1), ('books', 2)))] 

火花,它应该是如下(不知道因为我没有获得现在的火花):

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y) 
catSplitted = dataReduced.map(lambda (user,values) : (user, values.split(",")) 
catCounted = catSplitted.map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))) 

希望这会有所帮助。如果没有,您可以尝试使用spark命令检查如何获取python功能。基本逻辑应该可以工作

+0

谢谢,这似乎工作。你能否简单地解释我的最后一个lambda函数。哪个变量表示哪个值 - lambda(x,y):(x,tuple(set((z,y.count(z))for z)y) – user3756702 2014-10-30 09:30:41

+0

我创建了一个元组的元组, ('电影','电影','电影','电影','书籍')成为(('电影',2),('电影',1))。然后设置删除重复,然后将设置转换回元组。 – haraprasadj 2014-10-30 09:34:21

1

现在我已经得到了我期望的结果。但是,我猜想像我这样连接钥匙并不好。也许有人有另一种解决方案或任何建议?

# count the categorie views per user 
# data 
dataSource = sc.parallelize([("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")]) 
# Create Key,Value | concatenate user and category as key 
dataKeyValue = dataSource.map(lambda (user,category) : (user+","+category, 1)) 
# reduce 
dataReduced = dataKeyValue.reduceByKey(lambda x,y : x + y) 
# result => [('user2,books', 2), ('user1,film', 2), ('user2,film', 1)] 
# split key 
cleanResult = dataReduced.map(lambda (key,value) : (key.split(","),value)) 
0

另一个(更高效和易读的IMO)。 由于您的SPARK DAG不需要在基于用户的分区后收集重新分区类别,并且易于用户使用,因为它使用数据框而不是RDD。

首先,基于用户和类别的哈希列:

import pyspark.sql.functions as F 
df = spark.createDataFrame([("u1", "f"), ("u1", "f"), ("u2", "f"), ("u2", "b"), ("u2", "b")], schema=['u', 'c']) 
df = df.withColumn('hash', f.hash()) 

其次,我们通过散列分区,并在本地汇总:

from pyspark.sql import Window 
win = Window.partitionBy('hash') 
df.withColumns('count', F.count('hash').over(win)).distinct().show()