2016-11-21 108 views
2

用户I有一个火花数据帧象下面这样:创建每一行的每个项目于火花数据帧

User Item Purchased 
1 A 1 
1 B 2 
2 A 3 
2 C 4 
3 A 3 
3 B 2 
3 D 6 

only showing top 5 rows 

每个用户具有用于他们所购买的项目的行。假设Purhcased是多少数量。购买(计数)。

但是,有些用户可能没有购买的项目,因此对于该项目,该特定用户没有一行。我们只有用于购买用户的物品的行。所以如果用户1已经购买了物品A,B,那么对于这两个物品,我们有2行用于用户1。但是,如果用户2购买了A,C,那么用户2拥有行A和C,但没有行B.我希望最终每个用户都应该拥有表中所有项目的所有行,并且每个行都有相应的计数。

我想将这个数据框转换成上面的数据框,但也有行用户没有看到的项目,并给相应的计数为零。

象下面这样:我认为

User Item Purchased 
1 A 1 
1 B 2 
1 C 0 
1 D 0 
2 A 3 
2 C 4 
2 B 0 
2 D 0 
3 A 3 
3 B 2 
3 D 6 
3 C 0 
only showing top 5 rows 

的一种方式是,在火花如果我使用sqlContext的cross_tab方法将第一数据帧上然后我可以每一行转换为柱,用相应的值。对于用户没有的项目,它会为相同的项目创建一个列,并将其设置为零。

但如何将这些列转换回行?这也可能是一个迂回的方式。

感谢

回答

1
df = sqlContext.createDataFrame([(1, 'A', 2), (1, 'B', 3), (2, 'A', 2)], ['user', 'item', 'purchased']) 
pivot = df.groupBy('user').pivot('item').sum('purchased').fillna(0) 
items = [i['item'] for i in df.select('item').distinct().collect()] 
flattened_rdd = pivot.rdd.flatMap(lambda x: [(x['user'], i, x[i]) for i in items]) 
sqlContext.createDataFrame(flattened_rdd, ["user", "item", "purchased"]).show() 
+0

什么是purchase_count变量在这里?此外为什么要平均在那里?我需要数据的实际计数 – Baktaawar

+0

好吧,这段代码需要很长时间才能运行。我在我的笔记本电脑上运行它的火花,但它太耗时 – Baktaawar

+0

@Baktaawar我命名为purchase_count购买。编辑我的代码以反映这一点。不知道为什么花了这么长时间。我用我的笔记本电脑在4秒内运行了1249956行。这里是创建示例输入的代码。输出= [] 字母=列表(字符串.uppercase) 我在范围内(1,100000): mod = i%len(字母) 对于范围内的j(0,mod): output.append (我,字母[j],1))' – None

1

我们可以通过只使用仅df功能,以及实现这一目标。

orders = [(1,"A",1),(1,"B",2),(2,"A",3),(2,"C",4),(3,"A",3),(3,"B",2),(3,"D",6)] 
df = sqlContext.createDataFrame(orders, ["user","item","purchased"]) 
df_items = df.select("item").distinct().repartition(5).withColumnRenamed("item", "item_1") 
df_users = df.select("user").distinct().repartition(5).withColumnRenamed("user", "user_1") 
df_cartesian = df_users.join(df_items) 
//above expression returns cartesian product of users and items dfs 
joined_df = df_cartesian.join(df, [df_cartesian.user_1==df.user, df_cartesian.item_1==df.item], "left_outer").drop("user").drop("item") 
result_df = joined_df.fillna(0,["purchased"]).withColumnRenamed("item_1", "item").withColumnRenamed("user_1", "user") 

最后,result_df.show()产生如下图所示愿望输出:

+----+----+---------+ 
|user|item|purchased| 
+----+----+---------+ 
| 2| A|  3| 
| 2| B|  0| 
| 2| C|  4| 
| 2| D|  0| 
| 3| A|  3| 
| 3| B|  2| 
| 3| C|  0| 
| 3| D|  6| 
| 1| A|  1| 
| 1| B|  2| 
| 1| C|  0| 
| 1| D|  0| 
+----+----+---------+ 
+0

我需要重新分区吗? – Baktaawar

+0

是的,否则'df_users.join(df_items)'创建'200 * 200'任务。其中200是默认的分区数量。 – avr

+0

问题是它没有运行。超过12小时运行,甚至没有结果。我有300K obs。 – Baktaawar

相关问题