2017-03-03 40 views
1

什么是错,此代码试图改变一个datetime列的一天pyspark变化一天datetime列

import pyspark 
import pyspark.sql.functions as sf 
import pyspark.sql.types as sparktypes 
import datetime 

sc = pyspark.SparkContext(appName="test") 
sqlcontext = pyspark.SQLContext(sc) 

rdd = sc.parallelize([('a',datetime.datetime(2014, 1, 9, 0, 0)), 
         ('b',datetime.datetime(2014, 1, 27, 0, 0)), 
         ('c',datetime.datetime(2014, 1, 31, 0, 0))]) 
testdf = sqlcontext.createDataFrame(rdd, ["id", "date"]) 

print(testdf.show()) 
print(testdf.printSchema()) 

给出了测试数据框:

+---+--------------------+ 
| id|    date| 
+---+--------------------+ 
| a|2014-01-09 00:00:...| 
| b|2014-01-27 00:00:...| 
| c|2014-01-31 00:00:...| 
+---+--------------------+ 


root 
|-- id: string (nullable = true) 
|-- date: timestamp (nullable = true) 

然后我定义一个UDF改变天日期栏:

def change_day_(date, day): 
    return date.replace(day=day) 

change_day = sf.udf(change_day_, sparktypes.TimestampType()) 
testdf.withColumn("PaidMonth", change_day(testdf.date, 1)).show(1) 

这就提出了一个错误:

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace: 
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339) 
    at py4j.Gateway.invoke(Gateway.java:274) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
+1

也许尝试用'lit(1)'替换'1'(在调用'change_day')后,'从pyspark.sql.functions导入点亮'后执行'? –

+0

谢谢!这工作! – muon

+0

@ArthurTacca你能解释一下为什么? – muon

回答

1

假设接收多个参数的udf接收多个。 “1”不是一列。

这意味着您可以执行以下操作之一。要么让它作为意见建议栏:

testdf.withColumn("PaidMonth", change_day(testdf.date, lit(1))).show(1) 

亮(1)是那些

或使原有的函数返回一个高阶函数的一列:

def change_day_(day): 
    return lambda date: date.replace(day=day) 

change_day = sf.udf(change_day_(1), sparktypes.TimestampType()) 
testdf.withColumn("PaidMonth", change_day(testdf.date)).show(1) 

这基本上创建了一个替换为1的函数,因此可以接收一个整数。 udf将适用于单列。

+0

你的第二个解决方案的工作原理,但我不明白如何通过日期参数 – muon

+0

cahnge_day_函数创建一个函数。该函数使用change_day_中的day参数作为常量。 –

+1

@muon换句话说,用'lit(1)'将第一个参数作为参数传递给workers *上的'change_day' *。通过这个解决方案,'change_day _()'在驱动程序*上传递数字1 *,它返回一个函数,然后传递给工作人员。 (第二个函数包含数字1的隐藏副本,因此数字1仍然最终传递给所有工作人员。) –

0

感谢@ ArthurTacca的评论,关键是要使用pyspark.sql.functions.lit()功能如下:

testdf.withColumn("PaidMonth", change_day(testdf.date, sf.lit(1))).show() 

备选答案的欢迎!