2014-08-30 75 views
0

我想在Scala中编写简单的Spark代码。如何在DStream中应用RDD函数,同时在scala中编写代码

这里我得到一个DStream。我成功地能够打印此DStream。但是当我试图在此DStream上执行任何种类的“foreach”,“foreachRDD”或“transform”函数时,在执行我的程序期间,我的控制台正在冻结。在这里,我没有收到任何错误,但是直到我手动终止eclipse控制台操作时,控制台才变得无响应。我在这里附上代码。请告诉我我做错了什么。

我的主要目标是在DStream上应用RDD操作,并根据我的知识使用“foreach”,“foreachRDD”或“transform”函数将我的DStream转换为RDD。

我已经通过使用Java实现了相同。但在斯卡拉我有这个问题。

是否有其他人面临同样的问题?如果没有,那么请帮助我。由于

我写的样本代码在这里

object KafkaStreaming { 
    def main(args: Array[String]) { 
     if (args.length < 4) { 
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") 
      System.exit(1) 
     } 

     val Array(zkQuorum, group, topics, numThreads) = args 
       val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2)) 
     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap 
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 
     val splitLines:DStream[String] = lines.flatMap(_.split("\n")) 

     val pairAlarm = splitLines.map(

       x=>{ 
          //Some Code 
          val alarmPair = new Tuple2(key, value) 
          alarmPair 
       } 

       ) 


      //pairAlarm.print 


      pairAlarm.foreachRDD(x=>{ 
      println("1 : "+x.first) 
      x.collect       // When the execution reaching this part its getting freeze 
      println("2: "+x.first) 
      }) 


       ssc.start() 
       ssc.awaitTermination() 
    } 
} 

回答

3

我不知道这是不是你的问题,但我也有类似的一个。我的程序经过几次迭代后才停止打印。 5-6次打印后,没有例外情况等停止打印。

更改此:

val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2)) 

这样:

val ssc = new StreamingContext("local[2]", "KafkaWordCount", Seconds(2)) 

解决了这个问题。 Spark至少需要2个线程才能运行,并且文档示例也有误导性,因为它们也仅使用local

希望这会有所帮助!

+0

谢谢Serejja。在我看到你的答案后,我意识到了这个错误。这真是一个非常愚蠢的问题。 :D谢谢 – 2014-09-01 10:23:47