2015-10-26 95 views
4

我的spark版本是1.3,我正在使用pyspark。用户定义的函数中断pyspark数据框

我有一个叫做df的大型数据框。

from pyspark import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.parquetFile("events.parquet") 

然后我选择数据框的几列并尝试计算行数。这工作正常。

df3 = df.select("start", "end", "mrt") 
print(type(df3)) 
print(df3.count()) 

然后我申请一个用户自定义函数转换的一列从字符串到数字,如果我尝试算我得到的行数,这也能正常工作

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import LongType 
CtI = UserDefinedFunction(lambda i: int(i), LongType()) 
df4 = df2.withColumn("mrt-2", CtI(df2.mrt)) 

然而即使类型表明它是一个像df3一样的数据框也是一个例外。

print(type(df4)) 
print(df4.count()) 

我的错误:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-10-53941e183807> in <module>() 
     8 df4 = df2.withColumn("mrt-2", CtI(df2.mrt)) 
     9 print(type(df4)) 
---> 10 print(df4.count()) 
    11 df3 = df4.select("start", "end", "mrt-2").withColumnRenamed("mrt-2", "mrt") 

/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/dataframe.py in count(self) 
    299   2L 
    300   """ 
--> 301   return self._jdf.count() 
    302 
    303  def collect(self): 

/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o152.count. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1379 in stage 12.0 failed 4 times, most recent failure: Lost task 1379.3 in stage 12.0 (TID 27021, va1ccogbds01.lab.ctllabs.io): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 101, in main 
    process() 
    File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 96, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/serializers.py", line 236, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/functions.py", line 119, in <lambda> 
    File "<ipython-input-10-53941e183807>", line 7, in <lambda> 
TypeError: int() argument must be a string or a number, not 'NoneType' 

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) 
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:98) 
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:94) 
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:743) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:127) 
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:124) 
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:64) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1210) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1198) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1198) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1400) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
--------------------------------------------------------------------------- 

我使用正确定义功能的用户?任何想法为什么数据框架功能不适用于数据框架?

+0

您省略了跟踪调用!如何弄清楚这样的错误? – eliasah

+0

@eliasah固定,格式化是一个痛苦。 – deltap

+0

你能打印架构吗? –

回答

2

从堆栈跟踪看来,您的列看起来像包含一个None值,它正在打破int强制转换;你可以尝试改变你的lambda函数为lambda i: int(i) if i else None,来处理这种情况。

注意,只是因为df2.withColumn("mrt-2", CtI(df2.mrt))没有抛出一个错误并不意味着你的代码是正确的:星火有懒的评价,所以它不会实际尝试,直到你打电话countcollect或类似运行代码那。

+0

谢谢,这对我有效。 – deltap

0

你在使用火花笔记本吗? 我曾经在火花笔记本中遇到同样的错误。 但火花提交相同的代码运行良好

spark-submit YOURFILE.py