2017-02-14 73 views
2

我目前正在使用连接到RDS的EMR集群来收集2个表。Amazon EMR Pyspark:rdd.distinct.count()failling

创建的两个RDD非常庞大,但我可以执行.take(x)操作。

我还可以执行更复杂的操作,如:

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

但这样做下面的操作来算,从RDS进口不同用户的数量不工作:

unique_users = rdd.distinct.count() 

我曾尝试很多配置,以查看它是否是之前的内存问题(以防万一但它不能解决问题)...

这些是我现在得到的错误:

Traceback (most recent call last): 
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module> 
run_server() 
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server 
AppServer().run() 
File "/home/hadoop/AppEngine/src/server.py", line 45, in run 
api = create_app(self.context, self.apps, self.devices) 
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app 
engine = AppEngine(spark_context, apps, devices) 
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__ 
self.unique_users = self.ratings.distinct().count() 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect 
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms 
Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at scala.Option.foreach(Option.scala:257) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.collect(RDD.scala:934) 
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
at py4j.Gateway.invoke(Gateway.java:280) 
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
at py4j.commands.CallCommand.execute(CallCommand.java:79) 
at py4j.GatewayConnection.run(GatewayConnection.java:214) 
at java.lang.Thread.run(Thread.java:745)` 
+0

我指的是在消息中的异常'ExecutorLostFailure(执行13退出由正在运行的任务之一引起的)原因:执行人心跳后164253 ms'超时 – mrsrinivas

回答

1

的解决方案是以下几点:

我没有足够的内存来执行任务。 我将我在群集中使用的核心实例的类型更改为具有更多可用内存的实例(此处为m4.4xlarge)。

然后我不得不精确的参数给力我的实例的内存分配火花sumbmit:

--driver-memory 2G 
--executor-memory 50G 

您还可以添加这些参数,以避免因心跳或的failling一项艰巨的任务内存分配:

--conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096) 
--conf spark.executor.heartbeatInterval=60s 
1

ExecutorLostFailure原因:执行人心跳后164253毫秒

这个错误意味着执行人没有165秒后作出反应,被杀害(假设它是死下)

超时

如果您有一个任务长时间占用执行程序,并且需要执行,您可以在​​命令行中尝试以下设置,这会使心跳超时增加很长时间如此处所述:https://stackoverflow.com/a/37260231/5088142

一些方法如何调查此问题可以在这里找到:https://stackoverflow.com/a/37272249/5088142


下面将试图澄清一些问题,这在你的问题提出。

Spark Actions vs Transformations

星火使用延迟计算,即当您执行transformation它不执行它。星火只有当您执行action

在你给了复杂的操作,例如执行没有动作(即没有被执行/计算):

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

回顾spark doc about transformation

你可以看到,所有的操作在例子中使用:map,groupByKeyjoin正在转型。

因此,在执行这些命令后,实际上没有做任何事情。

行动

创建的两个RDD之间的差别是相当庞大,但我可以执行。取(x)的 操作等他们。

take(x)动作之间的差,并且count

take(x)动作结束后,返回的第x个元素。

count()行动结束后,才通过整个RDD

,你执行一些变换(如上例)的事实,是似乎运行是没有意义的 - 因为他们没有执行。

正在运行take(x)行动无法给出任何指示,因为它只会使用RDD的很小一部分。

结论

好像你的机器配置不支持您正在使用的数据的大小,或者你的代码中创建造成执行人挂的时间很长一段艰巨的任务(160秒)。

这实际上是对你RDD执行的第一个action是该问题的countaction