2017-05-31 91 views
1

我有时间的访问,时间戳记这样如何查找连续日期的最长序列?

ID, time 
1, 1493596800 
1, 1493596900 
1, 1493432800 
2, 1493596800 
2, 1493596850 
2, 1493432800 

我用火花SQL数据库,我需要有像

ID, longest_seq (days) 
1, 2 
2, 5 
3, 1 

每个ID consecutives的最长序列中的日期我试着去适应这个回答Detect consecutive dates ranges using SQL对我来说,但我没有达到我的期望。

SELECT ID, MIN (d), MAX(d) 
    FROM (
     SELECT ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date) AS d, 
       ROW_NUMBER() OVER(
     PARTITION BY ID ORDER BY cast(from_utc_timestamp(cast(time as timestamp), 'CEST') 
                  as date)) rn 
     FROM purchase 
     where ID is not null 
     GROUP BY ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date) 
    ) 
    GROUP BY ID, rn 
    ORDER BY ID 

如果有人对如何解决这一要求,或有什么错在它的一些线索,我将不胜感激帮助 感谢

[编辑]一个更明确的输入/输出

ID, time 
1, 1 
1, 2 
1, 3 
2, 1 
2, 3 
2, 4 
2, 5 
2, 10 
2, 11 
3, 1 
3, 4 
3, 9 
3, 11 

其结果将是:

ID, MaxSeq (in days) 
1,3 
2,3 
3,1 

所有的访问是在时间戳,但我需要连续几天,然后每天每次访问一天一次地计算在内

+0

你能给出更具代表性的意见吗?我认为输入数据集不匹配结果。 –

回答

0

这就是我喜欢的窗口集合函数的情况!

我想下面的例子可以帮助你(至少开始)。

以下是我使用的数据集。我将您的时间(长时间)转换为数字时间来表示一天(并避免在Spark SQL中使用时间戳,这可能会使解决方案难以理解...... 可能)。

scala> visits.show 
+---+----+ 
| ID|time| 
+---+----+ 
| 1| 1| 
| 1| 1| 
| 1| 2| 
| 1| 3| 
| 1| 3| 
| 1| 3| 
| 2| 1| 
| 3| 1| 
| 3| 2| 
| 3| 2| 
+---+----+ 

让我们定义窗口规范,将id行分组在一起。

import org.apache.spark.sql.expressions.Window 
val idsSortedByTime = Window. 
    partitionBy("id"). 
    orderBy("time") 

用那个你rank行和具有相同等级的计数行。

val answer = visits. 
    select($"id", $"time", rank over idsSortedByTime as "rank"). 
    groupBy("id", "time", "rank"). 
    agg(count("*") as "count") 
scala> answer.show 
+---+----+----+-----+ 
| id|time|rank|count| 
+---+----+----+-----+ 
| 1| 1| 1| 2| 
| 1| 2| 3| 1| 
| 1| 3| 4| 3| 
| 3| 1| 1| 1| 
| 3| 2| 2| 2| 
| 2| 1| 1| 1| 
+---+----+----+-----+ 

出现(非常接近?)解决方案。 你似乎完成了!

+0

感谢您的帮助 我不确定按ID,时间和等级计数的使用情况,它代表按ID编号的访问次数,日期?那么为什么我们应该使用排名功能呢? – Farah

1

我在下面的答案是从https://dzone.com/articles/how-to-find-the-longest-consecutive-series-of-even改编的,用于Spark SQL。你必须与包裹SQL查询:

spark.sql(""" 
SQL_QUERY 
""") 

因此,对于第一个查询:

CREATE TABLE intermediate_1 AS 
SELECT 
    id, 
    time, 
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY time) AS rn, 
    time - ROW_NUMBER() OVER (PARTITION BY id ORDER BY time) AS grp 
FROM purchase 

这会给你:

id, time, rn, grp 
1, 1, 1, 0 
1, 2, 2, 0 
1, 3, 3, 0 
2, 1, 1, 0 
2, 3, 2, 1 
2, 4, 3, 1 
2, 5, 4, 1 
2, 10, 5, 5 
2, 11, 6, 5 
3, 1, 1, 0 
3, 4, 2, 2 
3, 9, 3, 6 
3, 11, 4, 7 

我们可以看到,连续行具有相同的grp值。然后我们将使用GROUP BY和COUNT来获取连续时间的数量。

CREATE TABLE intermediate_2 AS 
SELECT 
    id, 
    grp, 
    COUNT(*) AS num_consecutive 
FROM intermediate_1 
GROUP BY id, grp 

这将返回:

id, grp, num_consecutive 
1, 0, 3 
2, 0, 1 
2, 1, 3 
2, 5, 2 
3, 0, 1 
3, 2, 1 
3, 6, 1 
3, 7, 1 

现在我们只是使用MAX和GROUP BY获得的连续时间的最大数量。

CREATE TABLE final AS 
SELECT 
    id, 
    MAX(num_consecutive) as max_consecutive 
FROM intermediate_2 
GROUP BY id 

,这将给你:

id, max_consecutive 
1, 3 
2, 3 
3, 1 

希望这有助于!