2016-11-16 69 views
0

我有一个数据集,随着时间的推移,表明某些用户所在的地区。从这个数据集中,我想计算他们在每个位置花费的夜晚的数量。通过“度过夜晚”,我的意思是:将用户看到的最后一个位置直到某一天的23h59;如果所有从该用户观察到的位置直到第二天的05:00,或者之后的第一个,如果还没有,则与前一天的最后一天相匹配,那就是在该位置花了一晚。如何根据在窗口中固定的列值增加计数器?

| Timestamp| User| Location| 
|1462838468|49B4361512443A4DA...|1| 
|1462838512|49B4361512443A4DA...|1| 
|1462838389|49B4361512443A4DA...|2| 
|1462838497|49B4361512443A4DA...|3| 
|1465975885|6E9E0581E2A032FD8...|1| 
|1457723815|405C238E25FE0B9E7...|1| 
|1457897289|405C238E25FE0B9E7...|2| 
|1457899229|405C238E25FE0B9E7...|11| 
|1457972626|405C238E25FE0B9E7...|9| 
|1458062553|405C238E25FE0B9E7...|9| 
|1458241825|405C238E25FE0B9E7...|9| 
|1458244457|405C238E25FE0B9E7...|9| 
|1458412513|405C238E25FE0B9E7...|6| 
|1458412292|405C238E25FE0B9E7...|6| 
|1465197963|6E9E0581E2A032FD8...|6| 
|1465202192|6E9E0581E2A032FD8...|6| 
|1465923817|6E9E0581E2A032FD8...|5| 
|1465923766|6E9E0581E2A032FD8...|2| 
|1465923748|6E9E0581E2A032FD8...|2| 
|1465923922|6E9E0581E2A032FD8...|2| 

我猜我需要在这里使用Window功能,并且我用PySpark在过去其他的东西,但我在茫然,在这里开始感到有点。

回答

1

我想你到底需要有一个函数,它接受了一系列的活动和产出之夜花......类似的信息(例如刚刚的想法):

def nights_spent(location_events): 
    # location_events is a list of events that have time and location 

    location_events = sort_by_time(location_events) 

    nights = [] 

    prev_event = None 
    for event in location_events[1:]: 
     if prev_location is not None: 
      if next_day(prev_event.time, event.time) \ 
       and same_location(prev_event.location, event.location): 
       # TODO: How do you handle when prev_event 
       # and event are more than 1 day apart? 
       nights.append(prev_location) 

     prev_location = location 

    return nights 

然后,我认为一个好的第一种方法是首先按用户分组,以便为​​给定用户获得所有事件(包括位置和时间)。

然后,您可以将该事件列表提供给上面的函数,并且您将在RDD中拥有所有(user, nights_spent)行。

因此,在一般情况下,RDD看起来是这样的:

nights_spent_per_user = all_events.map(lambda x => (x.user, [(x.time, x.location)])).reduce(lambda a, b: a + b).map(x => (x[0], nights_spent(x[1]))) 

希望有所帮助,让您开始。

+0

非常感谢!我需要将我的头围绕在这,但它似乎是一个非常好的开始。 –

相关问题