2017-03-06 61 views
0

我有用户活动数据在购物平台的RDD在pyspark为:找到独特的元组

USER_ID | product_id | 事件(查看产品,购买,加入购物车等)

事情是可以有多个事件类型相同(user_id,product_id)元组。我想收集同一行中的所有这些事件。

例子:

╔═════════════════════════════════════════════════╗ 
║ user_id | product_id    | Event ║ 
╠═════════════════════════════════════════════════╣ 
║ 1    1      viewed ║ 
║ 1    1      purchased ║ 
║ 2    1      added  ║ 
║ 2    2      viewed ║ 
║ 2    2      added  ║ 
╚═════════════════════════════════════════════════╝ 

我想:

╔════════════════════════════════════════════════╗ 
║ user_id | product_id |  Event    ║ 
╠════════════════════════════════════════════════╣ 
║ 1   1   {viewed, purchased}  ║ 
║ 2   1   {added}     ║ 
║ 2   2   {viewed, added}   ║ 
╚════════════════════════════════════════════════╝ 
+0

你看着使用内置'map'和'groupByKey'功能? – jtmingus

回答

0

在Scala中它应该是这样的:

val grouped : RDD[((user_id, product_id), Iterable[Event])]= rdd.map(triplet => ((triplet._1, triplet._2), triplet._3)).groupByKey() 
0

如果你需要尝试Dataframe的看看这个: -

import pyspark.sql.functions as F 
rdd = sc.parallelize([[1, 1, 'viewed'],[1, 1, 'purchased'],[2, 1, 'added'],[2, 2, 'viewed'],[2, 2, 'added']]) 
df = rdd.toDF(['user_id', 'product_id', 'Event']) 
df.groupby(['user_id', 'product_id']).agg(F.collect_set("Event")).show() 

如果喜欢遵循rdd这个外观: -

rdd = sc.parallelize([[1, 1, 'viewed'],[1, 1, 'purchased'],[2, 1, 'added'],[2, 2, 'viewed'],[2, 2, 'added']]) 
rdd.groupBy(lambda x:(x[0],x[1])).map(lambda x:(x[0][0], x[0][1], map(lambda x:x[2], list(x[1])))).collect()