2017-06-14 944 views
2

我有一个DF,其“产品”列中列出象下面这样:如何在dataframe spark的一列中获取列表的长度?

+----------+---------+--------------------+ 
|member_srl|click_day|   products| 
+----------+---------+--------------------+ 
|  12| 20161223| [2407, 5400021771]| 
|  12| 20161226|  [7320, 2407]| 
|  12| 20170104|    [2407]| 
|  12| 20170106|    [2407]| 
|  27| 20170104|  [2405, 2407]| 
|  28| 20161212|    [2407]| 
|  28| 20161213|  [2407, 100093]| 
|  28| 20161215|   [1956119]| 
|  28| 20161219|  [2407, 100093]| 
|  28| 20161229|   [7905970]| 
|  124| 20161011|  [5400021771]| 
|  6963| 20160101|   [103825645]| 
|  6963| 20160104|[3000014912, 6626...| 
|  6963| 20160111|[99643224, 106032...| 

如何添加一个新列product_cnt这是products列表的长度?以及如何过滤df以获得具有给定产品长度条件的指定行? 谢谢。

回答

0

第一个问题

如何添加一个新列product_cnt它们的产品列表的长度?

>>> a = [(12,20161223, [2407,5400021771]),(12,20161226,[7320,2407])] 
>>> df = spark.createDataFrame(a, 
["member_srl","click_day","products"]) 
>>> df.show() 
+----------+---------+------------------+ 
|member_srl|click_day|   products| 
+----------+---------+------------------+ 
|  12| 20161223|[2407, 5400021771]| 
|  12| 20161226|[7320, 2407, 4344]| 
+----------+---------+------------------+ 

你可以找到一个类似的例子here

>>> from pyspark.sql.types import IntegerType 
>>> from pyspark.sql.functions import udf 

>>> slen = udf(lambda s: len(s), IntegerType()) 

>>> df2 = df.withColumn("product_cnt", slen(df.products)) 
>>> df2.show() 
+----------+---------+------------------+-----------+ 
|member_srl|click_day|   products|product_cnt| 
+----------+---------+------------------+-----------+ 
|  12| 20161223|[2407, 5400021771]|   2| 
|  12| 20161226|[7320, 2407, 4344]|   3| 
+----------+---------+------------------+-----------+ 

第二个问题:给定产品长度的条件

以及如何过滤DF获得指定行?

您可以使用过滤功能docs here

>>> givenLength = 2 
>>> df3 = df2.filter(df2.product_cnt==givenLength) 
>>> df3.show() 
+----------+---------+------------------+-----------+ 
|member_srl|click_day|   products|product_cnt| 
+----------+---------+------------------+-----------+ 
|  12| 20161223|[2407, 5400021771]|   2| 
+----------+---------+------------------+-----------+ 
3

Pyspark有一个内置的功能来实现你想要什么叫sizehttp://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.size。 要将其添加为列,您可以在选择语句期间简单地调用它。

from pyspark.sql.functions import size 

countdf = df.select('*',size('products').alias('product_cnt')) 

过滤工作完全按照@ titiro89描述。此外,您可以在过滤器中使用size函数。这将允许您以下面的方式绕过添加额外的列(如果您希望这么做)。

filterdf = df.filter(size('products')==given_products_length) 
相关问题