1
我是PySpark的新品牌,我试图转换一些派生新变量'COUNT_IDX'的python代码。新变量的初始值为1,但在条件满足时会增加1。否则,新变量值将与最后一条记录中的值相同。PySpark条件增量
条件递增是当: TRIP_CD不等于先前记录TRIP_CD 或 SIGN不等于先前记录SIGN 或 time_diff不等于1
Python代码(熊猫数据帧):
df['COUNT_IDX'] = 1
for i in range(1, len(df)):
if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1])
or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1])
or df['time_diff'].iloc[i] != 1):
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1
else:
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1]
这是预期的结果:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 3
2711 - 1 3
2854 - 1 4
2854 + 1 5
在PySpark,我初始化COUNT_IDX为1。然后使用Window功能,我把TRIP_CD和SIGN的滞后和计算的time_diff,然后尝试:
df = sqlContext.sql('''
select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff,
case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag or seconds_diff != 1
then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1
else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))
end as COUNT_INDEX from df''')
这是给我喜欢的东西:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 2
2711 - 1 1
2854 - 1 2
2854 + 1 2
如果在先前记录上更新COUNT_IDX,则当前记录上的COUNT_IDX不会识别要计算的更改。这就像COUNTI_IDX没有被覆盖,或者它不是按行进行评估。有关我如何解决此问题的任何想法?
这是一个创造性的解决方案,但是,我还没有完全得到它的工作还没有。你把这个在withColumn语句中创建一个新的累积总和列或这应该是SQL?谢谢! – Amber
这是为了替代'case when'和'end'之间的SQL查询。如果您愿意,可以内联窗口定义。由于数据中有一些缺失的列,所以显示它只是伪代码。 – zero323