2016-07-24 66 views
0

我正在做这个MOOC以进行火花刷新,并遇到了这个问题 “在先前创建的数据框中找到唯一主机的号码” Apache日志分析)获取列中唯一行数的最优化方法火花数据帧

数据帧看起来像这样

--------------------+--------------------+------+------------+--------------------+ 
|    host|    path|status|content_size|    time| 
+--------------------+--------------------+------+------------+--------------------+ 
| in24.inetnebr.com |/shuttle/missions...| 200|  1839|1995-08-01 00:00:...| 
| uplherc.upl.com |     /| 304|   0|1995-08-01 00:00:...| 
| uplherc.upl.com |/images/ksclogo-m...| 304|   0|1995-08-01 00:00:...| 
| uplherc.upl.com |/images/MOSAIC-lo...| 304|   0|1995-08-01 00:00:...| 
| uplherc.upl.com |/images/USA-logos...| 304|   0|1995-08-01 00:00:...| 
|ix-esc-ca2-07.ix....|/images/launch-lo...| 200|  1713|1995-08-01 00:00:...| 
| uplherc.upl.com |/images/WORLD-log...| 304|   0|1995-08-01 00:00:...| 
|slppp6.intermind....|/history/skylab/s...| 200|  1687|1995-08-01 00:00:...| 
|piweba4y.prodigy....|/images/launchmed...| 200|  11853|1995-08-01 00:00:...| 
|slppp6.intermind....|/history/skylab/s...| 200|  9202|1995-08-01 00:00:...| 
|slppp6.intermind....|/images/ksclogosm...| 200|  3635|1995-08-01 00:00:...| 
|ix-esc-ca2-07.ix....|/history/apollo/i...| 200|  1173|1995-08-01 00:00:...| 
|slppp6.intermind....|/history/apollo/i...| 200|  3047|1995-08-01 00:00:...| 
| uplherc.upl.com |/images/NASA-logo...| 304|   0|1995-08-01 00:00:...| 
|  133.43.96.45 |/shuttle/missions...| 200|  10566|1995-08-01 00:00:...| 
|kgtyk4.kj.yamagat...|     /| 200|  7280|1995-08-01 00:00:...| 
|kgtyk4.kj.yamagat...|/images/ksclogo-m...| 200|  5866|1995-08-01 00:00:...| 
| d0ucr6.fnal.gov |/history/apollo/a...| 200|  2743|1995-08-01 00:00:...| 

现在,我已经试过3种方法找到没有唯一的主机

from pyspark.sql import functions as func 
unique_host_count = logs_df.agg(func.countDistinct(col("host"))).head()[0] 

这个运行在约0.72秒

unique_host_count = logs_df.select("host").distinct().count() 

这个运行在0.57秒

unique_host = logs_df.groupBy("host").count() 
unique_host_count = unique_host.count() 

这个运行在0.62秒

所以我的问题是有没有什么比第二个更好的选择,我认为distinct是一个昂贵的操作,但它原来是最快的

The data frame I am using have 

1043177 rows 

spark version - 1.6.1 
cluster -6 gb memory 

回答

3

这不是低效的。因为它确定每个分区的所有唯一值(不要忘记您的数据在多个节点中分割)。之后,它会比较这些值并在所有节点中选取所有不同的值,并且这很容易并行化。另一方面,当您对数据进行分组时,Spark会对数据进行混洗,而且这样更加昂贵,因为在大多数情况下,您必须密集使用网络。

0

我怀疑这将导致洗牌也,但它是一个选项

logs_df.dropDuplicates("host").count 
1

您使用使用地图&减少操作完成它:

unique_host_count = logs_df.select("host")\ 
.map(lambda x: (x, 1))\ 
.reduceByKey(lambda x, y: x+y)\ 
.count()