2015-06-19 81 views
0

在第二次迭代的任务出现在线路挂起:代码不Scala的星火在第二次迭代结束

val wordCountWithLabelsCollect = wordCountWithLabels.collect 

这是斯卡拉输出:

5/06/19 15:49:33 INFO DAGScheduler: Submitting Stage 1 (MappedValuesRDD[3] at mapValues at Ques.scala:33), which has no missing parents 
15/06/19 15:49:33 INFO MemoryStore: ensureFreeSpace(2480) called with curMem=2219, maxMem=1030823608 
15/06/19 15:49:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.4 KB, free 983.1 MB) 
15/06/19 15:49:33 INFO MemoryStore: ensureFreeSpace(1812) called with curMem=4699, maxMem=1030823608 
15/06/19 15:49:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1812.0 B, free 983.1 MB) 
15/06/19 15:49:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54590 (size: 1812.0 B, free: 983.1 MB) 
15/06/19 15:49:33 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 
15/06/19 15:49:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 
15/06/19 15:49:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedValuesRDD[3] at mapValues at Ques.scala:33) 
15/06/19 15:49:33 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 

Scala代码:

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 

object Ques extends App { 

    val data = getUncategorisedData 

    data.foreach(document => { 

    runner 

    }) 

    case class Document(label: String, text: String) 

    def reduceList(list: List[(String, Int)]) = list.groupBy(_._1).mapValues(_.aggregate(0)(_ + _._2, _ + _)) 

    def runner = { 
    val trainingData = getSC.parallelize(

     List(
     Document("sport", "this is text for document spor a"), 
     Document("sport", "this spor is text for document spor b"), 
     Document("news", "this is such a new categorise data"))) 

    val counts: org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = trainingData.map(doc => ((doc.label, doc.text.split(" ").toList.map(w => (w, 1))))) 

    val mergedList = counts.mapValues((list: List[(String, Int)]) => reduceList(list).toList) 
    val wordCountWithLabels: org.apache.spark.rdd.RDD[(String, List[(String, Int)])] = mergedList.reduceByKey((accum: List[(String, Int)], value: List[(String, Int)]) => 
     { 
     val valueMap = value.toMap 
     val accumMap = accum.toMap 
     val mergedMap = accumMap ++ valueMap.map { case (k, v) => k -> (v + accumMap.getOrElse(k, 0)) } 
     mergedMap.toList 
     }) 

    val wordCountWithLabelsCollect = wordCountWithLabels.collect 
    wordCountWithLabels.collect 
    } 

    def getUncategorisedData: RDD[Document] = { 
    lazy val trainingData = getSC.parallelize(

     List(
     Document("", "this is text for document a"), 
     Document("", "this is text for document b"), 
     Document("", "this is text for for document c"))) 

    trainingData 

    } 

    lazy val getSC = { 

    val conf = new org.apache.spark.SparkConf() 
     .setMaster("local") 
     .setAppName("process") 
     .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spddark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") 
     .set("spark.executor.memory", "3g") 
     .set("deploy-mode", "standalone") 
     .set("SPARK_CONF_DIR", "c:\\data\\sparkConfig") 

    val sc = new SparkContext(conf) 

    sc 
    } 
} 

这里有什么问题?

在同一集合上多次调用收集应该不是问题?

如果我调用亚军陆续:

runner 
runner 

然后终止。

更新:的相同的行为

简单的例子:

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkContext._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 
import org.apache.spark.rdd.PairRDDFunctions 

object Ques extends App { 

    val conf = new org.apache.spark.SparkConf() 
    .setMaster("local") 
    .setAppName("process") 
    .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") 
    .set("spark.executor.memory", "3g") 

    val sc = new SparkContext(conf) 

    val data = sc.parallelize(List("")) 

    val counts: org.apache.spark.rdd.RDD[(String)] = sc.parallelize(List((""))) 

    data.foreach(document => { 
    counts.collect 
    }) 

} 

此代码也永远不会终止。看起来不能在foreach函数中多次调用collect?

更新2:

我不知道为什么,但在运行之前转换回到一切的司机会导致终止:

data.collect.foreach(document => { 
    counts.collect 
    }) 

回答

1

我想在这里简单的答案是,你正在尝试从一个转换中调用一个动作。操作将值返回给驱动程序;从转换中调用一个动作没有意义,因为转换是由工作人员执行的,而不是由驱动程序执行的。我找不到任何文档比this section of the programming guide更明确的地方。

相关问题