0
我运行这段代码:内存在星火上所填斯卡拉
for (i <- Range(1, listOfCities.value.length)) {
val city = listOfCities.value(i)
logger.info(s"Started with city=$city")
val data = getDataFromQueries(redshiftConnector, sparkSession, city)
val transactions = data.map { pidData =>
pidData.p_list.trim.split(',')
}
val result: RDD[CRule[String]] =
run(sparkSession, transactions.rdd, numPartitions)
val resultWithCity = result.map { rule =>
(rule, city)
}
val df: DataFrame =
convertResultToDataframe(sparkSession, resultWithCity)
writeToRedshift(redshiftConnector, df, tableName)
}
每个结果由行的LACS的。该代码适用于2-3个城市。但随后工作人员的记忆开始填满,程序变得非常缓慢。为什么内存被填满了?垃圾收集没有发生,或者在我的代码中有一些内存泄漏?我正在监测每个城市的记忆需求,并且每当城市完成时它每次都不会下降。 每次迭代后我应该如何清理RAM?由于
配置 - 3名m4.2xlarge工人,8个节点,30 GB RAM,6个执行人有4核,每个核