2017-03-07 76 views
0

我有一个Spark Streaming应用程序正在运行,它使用mapWithState函数来跟踪RDD的状态。 应用程序运行罚款几分钟,但然后用为什么火花工的内存使用量会随着时间而增加?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373 

崩溃我观察到的星火应用增加了内存使用情况随时间线性,即使我已经为mapWithStateRDD超时。请参阅下面的代码片段和内存使用情况 -

val completedSess = sessionLines 
        .mapWithState(StateSpec.function(trackStateFunction _) 
        .numPartitions(80) 
        .timeout(Minutes(5))) 

enter image description here

为什么要增加内存随时间线性如果每个RDD明确超时?

我试图增加内存,但没关系。我错过了什么?

编辑 - 代码参考

高清trackStateFunction(batchTime:时间,关键:字符串值:选项[字符串],状态:国发[(布尔,列表[字符串]龙)]):选项[ (布尔,列表[字符串])] = {

def updateSessions(newLine: String): Option[(Boolean, List[String])] = { 
    val currentTime = System.currentTimeMillis()/1000 

    if (state.exists()) { 
     val newLines = state.get()._2 :+ newLine 

     //check if end of Session reached. 
     // if yes, remove the state and return. Else update the state 
     if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) { 
     state.remove() 
     Some(true, newLines) 
     } 
     else { 
     val newState = (false, newLines, currentTime) 
     state.update(newState) 
     Some(state.get()._1, state.get()._2) 
     } 
    } 
    else { 
     val newState = (false, List(value.get), currentTime) 
     state.update(newState) 
     Some(state.get()._1, state.get()._2) 
    } 
    } 

    value match { 
    case Some(newLine) => updateSessions(newLine) 
    case _ if state.isTimingOut() => Some(true, state.get()._2) 
    case _ => { 
     println("Not matched to any expression") 
     None 
    } 
    } 
} 
+1

您有多少传入流量?多少RAM /磁盘?我们需要更多信息。 –

+1

另外,您有多久检查一次? –

+1

我有一个由4名工作人员组成的集群(8个内核,32 GB RAM,每个128 GB SSD)。来自Kinesis Stream的传入流量为10-15 MB/s。批处理间隔为10秒。检查点间隔为60s – cmbendre

回答

1

根据mapwithstate的信息: 国家规范 初始状态为RDD - 你可以从一些卖场加载初始状态,然后开始新的数据流作业与那个状态。

分区数量 - 键值状态dstream通过键进行分区。如果您之前对状态的大小有很好的估计,则可以提供分区的数量来相应地对其进行分区。

分区程序 - 您还可以提供自定义分区程序。默认分区程序是散列分区程序。如果您对密钥空间有很好的理解,那么您可以提供一个自定义分区器,它可以执行比默认散列分区程序更高效的更新。

超时 - 这将确保其值未在特定时间段内更新的键将从状态中移除。这可以帮助清理旧键的状态。

所以,超时只能用一段时间后清理,而没有更新的键。内存将运行完整并最终阻塞,因为执行程序没有分配足够的内存。这给了MetaDataFetchFailed异常。随着记忆力的增加,我希望你的意思是执行者。即使增加执行程序的内存可能也不起作用,因为该流仍在继续。使用MapWithState,会话线将包含与输入dstream相同的记录数。所以要解决这个问题就是让你的dstream更小。在流媒体方面,你可以设置一个批次的时间间隔,这将最有可能解决这个

VAL SSC =新的StreamingContext(SC,秒(batchIntervalSeconds))

记得在同时做一次快照和检查点。快照将允许您使用来自之前丢失的流的信息进行其他计算。希望这有助于了解更多信息,请参阅:https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlhttp://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/

相关问题