0
我在下面有下面的代码。基本上我想要做的是从现有的值中生成一些新的列。在完成之后,我将新数据列保存为群集中的表格。对不起,我还是新手。Pyspark:使用lambda函数和.withColumn会产生无类型错误我无法理解
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql.functions import udf, array
from pyspark.sql.types import DecimalType
import numpy as np
import math
df = sqlContext.sql('select * from db.mytable')
angle_av = udf(lambda (x, y): -10 if x == 0 else math.atan2(y/x)*180/np.pi, DecimalType(20,10))
df = df.withColumn('a_v_angle', angle_av(array('a_v_real', 'a_v_imag')))
df.createOrReplaceTempView('temp')
sqlContext.sql('create table new_table as select * from temp')
这些操作实际上不产生任何错误。然后我试图存储DF作为一个表,并收到以下错误(我猜,因为这是时的动作的实际执行):
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 171, in main
process()
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 166, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 103, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<stdin>", line 14, in <lambda>
TypeError: unsupported operand type(s) for /: 'NoneType' and 'NoneType'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
请张贴产生错误 – desertnaut
产生错误的命令的确切的命令是:sqlContext.sql( '创建NEW_TABLE如SELECT * FROM临时') –