2016-11-22 85 views
0

在熊猫我有类似火花SQL距离最近的假期

indices = df.dateColumn.apply(holidays.index.searchsorted) 
df['nextHolidays'] = holidays.index[indices] 
df['previousHolidays'] = holidays.index[indices - 1] 

一个函数,它计算到最近的假期并存储作为新列的距离。

searchsortedhttp://pandas.pydata.org/pandas-docs/version/0.18.1/generated/pandas.Series.searchsorted.html对于大熊猫来说是一个很好的解决方案,因为这给了我下一个假期的索引而没有算法复杂度高的问题Parallelize pandas apply例如,这种方法比并行循环要快得多。

我该如何在火花或蜂房中实现这一点?

回答

1

这可以使用聚合来完成,但是这种方法比pandas方法具有更高的复杂度。但是您可以使用UDF实现类似的性能。它不会像大熊猫一样优雅,但:

假定该数据集的节日:

holidays = ['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03'] 
index = spark.sparkContext.broadcast(sorted(holidays)) 

而2016年的日期数据集的数据帧:

from datetime import datetime, timedelta 
dates_array = [(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(366)] 
from pyspark.sql import Row 
df = spark.createDataFrame([Row(date=d) for d in dates_array]) 

UDF可以使用熊猫searchsorted,但需要在执行者上安装熊猫。 insted的,你可以使用Python的计划是这样的:

def nearest_holiday(date): 
    last_holiday = index.value[0] 
    for next_holiday in index.value: 
     if next_holiday >= date: 
      break 
     last_holiday = next_holiday 
    if last_holiday > date: 
     last_holiday = None 
    if next_holiday < date: 
     next_holiday = None 
    return (last_holiday, next_holiday) 


from pyspark.sql.types import * 
return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())]) 

from pyspark.sql.functions import udf 
nearest_holiday_udf = udf(nearest_holiday, return_type) 

,可与withColumn使用:

df.withColumn('holiday', nearest_holiday_udf('date')).show(5, False) 

+----------+-----------------------+ 
|date  |holiday    | 
+----------+-----------------------+ 
|2016-01-01|[null,2016-01-03]  | 
|2016-01-02|[null,2016-01-03]  | 
|2016-01-03|[2016-01-03,2016-01-03]| 
|2016-01-04|[2016-01-03,2016-03-03]| 
|2016-01-05|[2016-01-03,2016-03-03]| 
+----------+-----------------------+ 
only showing top 5 rows 
+0

谢谢,这看起来不错。我需要将它移植到scala中;) –

+0

你指的是什么'sorted(holidays)'操作?它是一个pyspark api吗? –

+0

这是python的。它对UDF进行排序,我可以通过它来查找匹配的日期。 – Mariusz