我有一个Spark Streaming应用程序正在运行,它使用mapWithState函数来跟踪RDD的状态。 应用程序运行罚款几分钟,但然后用为什么火花工的内存使用量会随着时间而增加?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373
崩溃我观察到的星火应用增加了内存使用情况随时间线性,即使我已经为mapWithStateRDD超时。请参阅下面的代码片段和内存使用情况 -
val completedSess = sessionLines
.mapWithState(StateSpec.function(trackStateFunction _)
.numPartitions(80)
.timeout(Minutes(5)))
为什么要增加内存随时间线性如果每个RDD明确超时?
我试图增加内存,但没关系。我错过了什么?
编辑 - 代码参考
高清trackStateFunction(batchTime:时间,关键:字符串值:选项[字符串],状态:国发[(布尔,列表[字符串]龙)]):选项[ (布尔,列表[字符串])] = {
def updateSessions(newLine: String): Option[(Boolean, List[String])] = {
val currentTime = System.currentTimeMillis()/1000
if (state.exists()) {
val newLines = state.get()._2 :+ newLine
//check if end of Session reached.
// if yes, remove the state and return. Else update the state
if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) {
state.remove()
Some(true, newLines)
}
else {
val newState = (false, newLines, currentTime)
state.update(newState)
Some(state.get()._1, state.get()._2)
}
}
else {
val newState = (false, List(value.get), currentTime)
state.update(newState)
Some(state.get()._1, state.get()._2)
}
}
value match {
case Some(newLine) => updateSessions(newLine)
case _ if state.isTimingOut() => Some(true, state.get()._2)
case _ => {
println("Not matched to any expression")
None
}
}
}
您有多少传入流量?多少RAM /磁盘?我们需要更多信息。 –
另外,您有多久检查一次? –
我有一个由4名工作人员组成的集群(8个内核,32 GB RAM,每个128 GB SSD)。来自Kinesis Stream的传入流量为10-15 MB/s。批处理间隔为10秒。检查点间隔为60s – cmbendre