2017-07-03 93 views
0

排序名单上有一个数据帧,其中列有一个称为stopped是:UDF在pyspark

+--------------------+ 
|    stopped| 
+--------------------+ 
|[nintendo, dsi, l...| 
|[nintendo, dsi, l...| 
| [xl, honda, 500]| 
|[black, swan, green]| 
|[black, swan, green]| 
|[pin, stripe, sui...| 
| [shooting, braces]| 
|  [haus, geltow]| 
|[60, cm, electric...| 
| [yamaha, yl1, yl2]| 
|[landwirtschaft, ...| 
|  [wingbar, 9581]| 
|  [gummi, 16mm]| 
|[brillen, lupe, c...| 
|[man, city, v, ba...| 
|[one, plus, one, ...| 
|  [kapplocheisen]| 
|[tractor, door, m...| 
|[pro, nano, flat,...| 
|[kaleidoscope, to...| 
+--------------------+ 

我想创建一个包含相同的列表,但这里的关键词是有序另一列。

据我了解,我需要创建一个UDF是需要返回一个列表:

udf_sort = udf(lambda x: x.sort(), ArrayType(StringType())) 
ps_clean.select("*", udf_sort(ps_clean["stopped"])).show(5, False) 

,我也得到:

+---------+----------+---------------------+------------+--------------------------+--------------------------+-----------------+ 
|client_id|kw_id  |keyword    |max_click_dt|tokenized     |stopped     |<lambda>(stopped)| 
+---------+----------+---------------------+------------+--------------------------+--------------------------+-----------------+ 
|710  |4304414582|nintendo dsi lite new|2017-01-06 |[nintendo, dsi, lite, new]|[nintendo, dsi, lite, new]|null    | 
|705  |4304414582|nintendo dsi lite new|2017-03-25 |[nintendo, dsi, lite, new]|[nintendo, dsi, lite, new]|null    | 
|707  |647507047 |xl honda 500 s  |2016-10-26 |[xl, honda, 500, s]  |[xl, honda, 500]   |null    | 
|710  |26308464 |black swan green  |2016-01-01 |[black, swan, green]  |[black, swan, green]  |null    | 
|705  |26308464 |black swan green  |2016-07-13 |[black, swan, green]  |[black, swan, green]  |null    | 
+---------+----------+---------------------+------------+--------------------------+--------------------------+-----------------+ 

为什么排序不适用?

回答

2

x.sort()通常排序到位名单(但我怀疑它不会做在一个pyspark数据框中),它返回None。这就是你的专栏标签<lambda>(stopped)的所有null的值。 sorted(x)将对列表进行排序并返回新的排序副本。所以,用

udf_sort = udf(lambda x: sorted(x), ArrayType(StringType())) 

应该可以解决你的问题。

或者,您可以使用内置函数sort_array而不是定义您自己的udf。

from pyspark.sql.functions import sort_array 

ps_clean.select("*", sort_array(ps_clean["stopped"])).show(5, False) 

这种方法是干净了一点,实际上,你可以期望得到一些性能提升,因为pyspark没有序列化你的UDF。