2017-07-26 59 views
0

我在火花一个模式作为如何在重新计算后替换spark数据框中的值?

root 
|-- atom: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- dailydata: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- datatimezone: string (nullable = true) 
| | | | |-- intervaltime: long (nullable = true) 
| | | | |-- intervalvalue: long (nullable = true) 
| | | | |-- utcacquisitiontime: string (nullable = true) 
| | |-- usage: string (nullable = true) 
| -- titlename: string (nullable = true) 

我已提取的utcacquisitiontime和从以上模式datatimezone如下

val result=q.selectExpr("explode(dailydata) as r").select("r.utcacquisitiontime","r.datatimezone") 

+--------------------+------------+ 
| utcacquisitiontime|datatimezone| 
+--------------------+------------+ 
|2017-03-27T22:00:00Z|  +02:00| 
|2017-03-27T22:15:00Z|  +02:00| 
|2017-03-27T22:30:00Z|  +02:00| 
|2017-03-27T22:45:00Z|  +02:00| 
|2017-03-27T23:00:00Z|  +02:00| 
|2017-03-27T23:15:00Z|  +02:00| 
|2017-03-27T23:30:00Z|  +02:00| 
|2017-03-27T23:45:00Z|  +02:00| 
|2017-03-28T00:00:00Z|  +02:00| 
|2017-03-28T00:15:00Z|  +02:00| 
|2017-03-28T00:30:00Z|  +02:00| 
|2017-03-28T00:45:00Z|  +02:00| 
|2017-03-28T01:00:00Z|  +02:00| 
|2017-03-28T01:15:00Z|  +02:00| 
|2017-03-28T01:30:00Z|  +02:00| 
|2017-03-28T01:45:00Z|  +02:00| 
|2017-03-28T02:00:00Z|  +02:00| 
|2017-03-28T02:15:00Z|  +02:00| 
|2017-03-28T02:30:00Z|  +02:00| 
|2017-03-28T02:45:00Z|  +02:00| 
+--------------------+------------+ 

我需要使用这两个列计算localtime和由localtime替换它们经过计算。我应该如何计算localtime并替换它?

+3

可以使用'withColumn'方法上数据帧,并使用[火花功能](https://spark.apache.org/docs/2.0.2/api/java/o rg/apache/spark/sql/functions.html) –

回答

2

你可以依赖spark中的udf函数(User Defined Function)。另外在org.apache.sql.functions._中有很多已经预定义的函数可以帮助你。但这里是你如何使这项工作

+-------------------+------------+ 
| utcacquisitiontime|datatimezone| 
+-------------------+------------+ 
|2017-03-27T22:00:00|  +02:00| 
+-------------------+------------+ 

请注意,我已经从时间列中删除不必要的“Z”。 使用JodaTime依赖这样定义一个UDF功能:

val toTimestamp = udf((time:String, zone:String) => { 
     val timezone = DateTimeZone.forID(zone) 
    val df = DateTimeFormat.forPattern("yyyy-mm-dd'T'HH:mm:ss") 
    new java.sql.Timestamp(df.withZone(timezone).parseDateTime(time).getMillis) 
    }) 

withColumn

df.withColumn("timestamp", toTimestamp(col("utcacquisitiontime"), col("datatimezone")) 

的结果显示(注意,在该模式中的列时间戳类型的时间戳,所以你将它应用在列可以做在其上的日期操作)

+-------------------+------------+--------------------+ 
| utcacquisitiontime|datatimezone|   timestamp| 
+-------------------+------------+--------------------+ 
|2017-03-27T22:00:00|  +02:00|2017-01-27 22:00:...| 
+-------------------+------------+--------------------+ 

root 
|-- utcacquisitiontime: string (nullable = true) 
|-- datatimezone: string (nullable = true) 
|-- timestamp: timestamp (nullable = true) 
+0

谢谢!它帮助! – Ninja

+0

如何将'timestamp'追加到根?如果你能帮忙,那真的很感激。 – Ninja

+1

df.drop(“utcacquisitiontime”)。drop(“datatimezone”)。如果它对你有帮助,也请投票 – dumitru

0

您可以使用Joda Time API时间在DF列通过执行类似转换为本地时间,

def convertToLocal(str:String):String = new DateTime(str).toLocalDateTime().toString 

下一个导入SQL implicits通过,

import ss.implicits._ 

,其中SS是实例您的SparkSession。为了utcacquisitiontime列localDateTime的每个元素进行转换,做这样的事情,

val df=result map(r=>(convertToLocal(r.getString(0)),r.getString(1))) 

df show 

让我知道,如果这有助于。干杯。

+0

感谢您的帮助! – Ninja

+0

你能不能帮助我如何追加这个新的df到原来的json模式,我有上面这样'utcacquisitiontime' | 'datatimezone'被替换为新的'localtimestamp'计算? – Ninja

相关问题