2016-11-30 92 views
1

我试过搜索了一下,但是在Spark SQL中只能找到add_month函数,所以最后在这里打开了一个新的线程。将不胜感激任何人可以提供的帮助。在Spark中添加12小时到datetime列

我想在使用sqlContext的Spark SQL的日期列中添加小时12,24和48。我使用的1.6.1版本星火,我需要这样的:

SELECT N1.subject_id, '12-HOUR' AS notes_period, N1.chartdate_start, N2.chartdate, N2.text 
FROM NOTEEVENTS N2, 
(SELECT subject_id, MIN(chartdate) chartdate_start 
    FROM NOTEEVENTS 
    WHERE subject_id = 283 
    AND category != 'Discharge summary' 
GROUP BY subject_id) N1 
WHERE N2.subject_id = N1.subject_id 
and n2.chartdate < n1.chartdate_start + interval '1 hour' * 12 

请注意最后一句话,这是写在PostgreSQL,而这也正是我需要星火SQL。我非常感谢我能得到的任何帮助。

谢谢。

回答

4

目前有没有这样的功能,但你可以写UDF:

sqlContext.udf.register("add_hours", (datetime : Timestamp, hours : Int) => { 
    new Timestamp(datetime.getTime() + hours * 60 * 60 * 1000) 
}); 

例如:

SELECT N1.subject_id, '12-HOUR' AS notes_period, N1.chartdate_start, N2.chartdate, N2.text 
    FROM NOTEEVENTS N2, 
    (SELECT subject_id, MIN(chartdate) chartdate_start 
     FROM NOTEEVENTS 
     WHERE subject_id = 283 
     AND category != 'Discharge summary' 
    GROUP BY subject_id) N1 
    WHERE N2.subject_id = N1.subject_id 
    and n2.chartdate < add_hours(n1.chartdate_start, 12) 

您还可以使用UNIX_TIMESTAMP函数来计算新的日期。这是在我看来不太可读,但是可以使用由Anton Okolnychyi其他答案

import org.apache.spark.sql.functions._ 
val addMonths = (datetime : Column, hours : Column) => { 
    from_unixtime(unix_timestamp(n1.chartdate_start) + 12 * 60 * 60) 
} 
+0

@Ahsan大:)如果你将有性能问题,再看看安东Okolnychyi答案 - 本地功能有可能是推下去。但在这种情况下,我认为不可能推下谓词,因为它在少数表格上运行,所以UDF应该没问题且更易于阅读。 –

4

有关使用unix_timestamp()功能的日期时间戳转换成秒,然后加入hours * 60 * 60它是什么激发了WholeStage代码将军代码?

那么你的条件将是这样的:

unix_timestamp(n2.chartdate) < (unix_timestamp(n1.chartdate_start) + 12 * 60 * 60)

+0

我已经尝试了这种方法,并且是一个完美可行且很好的解决方案。我只是在想,是否有像add_month这样的东西。谢谢。 – Ahsan

2

相同的PostgreSQL,你可以使用INTERVAL。在SQL

spark.sql("""SELECT current_timestamp() AS now, 
        current_timestamp() + INTERVAL 12 HOURS AS now_plus_twelve""" 
).show(false) 
+-----------------------+-----------------------+ 
|now     |now_plus_twelve  | 
+-----------------------+-----------------------+ 
|2017-12-14 10:49:15.115|2017-12-14 22:49:15.115| 
+-----------------------+-----------------------+ 

随着Dataset - 斯卡拉:

import org.apache.spark.sql.functions.{current_timestamp, expr} 

spark.range(1) 
    .select(
    current_timestamp as "now", 
    current_timestamp + expr("INTERVAL 12 HOURS") as "now_plus_twelve" 
).show(false) 
+-----------------------+-----------------------+ 
|now     |now_plus_twelve  | 
+-----------------------+-----------------------+ 
|2017-12-14 10:56:59.185|2017-12-14 22:56:59.185| 
+-----------------------+-----------------------+ 

的Python:

from pyspark.sql.functions import current_timestamp, expr 

(spark.range(1).select(
    current_timestamp().alias("now"), 
    (current_timestamp() + expr("INTERVAL 12 HOURS")).alias("now_plus_twelve")))