2017-08-04 126 views
0

我在运行Spark Streaming应用程序,该应用程序从Kafka(使用Direct Stream方法)读取数据并将结果发布回Kafka。应用程序的输入速率以及应用程序的吞吐量保持稳定大约一两个小时。之后,我开始在很长一段时间内看到在Active Batches队列中保留的批次(持续30分钟+)。该Spark driver日志表明以下两种类型的错误,而这些错误发生的时间与卡住批次的开始时间一致得好:Spark Spark待处理批处理

第一个错误类型

ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. 

二错误键入

ERROR StreamingListenerBus: Listener StreamingJobProgressListener threw an exception 
java.util.NoSuchElementException: key not found: 1501806558000 ms 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:59) 
    at scala.collection.mutable.HashMap.apply(HashMap.scala:65) 
    at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) 
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) 
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) 
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) 

但是,我不知道如何解释这些错误,尽管广泛的在线搜索,我合作没有找到与此相关的任何有用信息。

问题

  1. 什么这些错误是什么意思?它们是否表示资源限制(例如:CPU,内存等)?
  2. 解决这些错误的最佳方法是什么?

在此先感谢。

回答

0

您的批次持续时间是不是少于实际批处理时间?默认批处理队列大小为1000,因此可以溢出Spark Spark批处理队列。

+0

请考虑重新说明这一点。这看起来像一个平庸的评论,而不是真正回答问题的东西。 – GhostCat