2017-04-24 320 views
0

我有一个Python的火花代码如下。它基本上从self.user_RDD取得user_id,对于该user_id,它结合了product_CFproduct_list的产品。然后保存到Redis中。Pyspark:未捕获的异常线程心跳 - 接收器事件循环线程

for user_id in self.user_RDD.collect(): 
     product_CF = self.getpreferredProducts(user_id) 
     try: 
      product_list = json.loads(redis_client.hget('user_products',user_id)) 
      # combine 2 list 
      for product_id in product_list: 
       if product_id in product_CF: 
        product_CF.remove(product_id) 
      product_list.extend(product_CF) 
      r.hset('score',user_id,str(json.dumps(product_list))) 
     except Exception as e: 
      print e 

当有停止在执行过程中一个巨大的数据集,并抛出以下异常

17/04/21 12:10:00 ERROR Utils: Uncaught exception in thread heartbeat- 
receiver-event-loop-thread 
java.lang.NullPointerException 
    at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:464) 
    at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:439) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:408) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:407) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:407) 
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129) 
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283) 
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/04/21 12:10:10 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@7b7d4d24,BlockManagerId(driver, 172.31.26.252, 35155, None))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval 
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) 
    at scala.util.Try$.apply(Try.scala:192) 
    at scala.util.Failure.recover(Try.scala:216) 
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) 
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) 
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) 
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
    at scala.concurrent.Promise$class.complete(Promise.scala:55) 
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) 
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63) 
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78) 
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) 
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) 
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
    at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54) 
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) 
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106) 
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) 
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205) 
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds 
    ... 8 more 

回答

0

正是有了空间问题。我将score存储在当地的Redis实例中。然后我创建了AWS Redis并开始存储。它解决了这个问题。

相关问题