2017-03-02 60 views
0

我运行这段代码:内存在星火上所填斯卡拉

for (i <- Range(1, listOfCities.value.length)) { 
     val city = listOfCities.value(i) 
     logger.info(s"Started with city=$city") 
     val data = getDataFromQueries(redshiftConnector, sparkSession, city) 
     val transactions = data.map { pidData => 
     pidData.p_list.trim.split(',') 
     } 

     val result: RDD[CRule[String]] = 
     run(sparkSession, transactions.rdd, numPartitions) 
     val resultWithCity = result.map { rule => 
     (rule, city) 
     } 
     val df: DataFrame = 
     convertResultToDataframe(sparkSession, resultWithCity) 
     writeToRedshift(redshiftConnector, df, tableName) 
    } 

每个结果由行的LACS的。该代码适用于2-3个城市。但随后工作人员的记忆开始填满,程序变得非常缓慢。为什么内存被填满了?垃圾收集没有发生,或者在我的代码中有一些内存泄漏?我正在监测每个城市的记忆需求,并且每当城市完成时它每次都不会下降。 每次迭代后我应该如何清理RAM?由于

配置 - 3名m4.2xlarge工人,8个节点,30 GB RAM,6个执行人有4核,每个核

回答

0

这是一个非常不好的做法,在星火使用for循环,因为星火优化基本上是愚蠢的。

我的想法是首先将所有数据放入一个公共数据框。所以基本上你会做到以下几点:

var city = listOfCities.value(1) 
    var data = getDataFromQueries(redshiftConnector, sparkSession, city) 
    for (i <- Range(2, listOfCities.value.length)){ 
    var city = listOfCities.value(i) 
    var data = data.unionAll(getDataFromQueries(redshiftConnector, sparkSession, city)) 
    } 

后来才与你PROGRAMM继续为“大”的数据帧