2017-05-09 64 views
0
object SparkMain extends App { 
System.setProperty("spark.cassandra.connection.host", "127.0.0.1") 
val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(5)) 
val sqlContext= new SQLContext(sc) 
val host = "localhost:2181" 
val topicList = List("test","fb") 
topicList.foreach{ 
    topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2); 
    //configureStream(topic, lines) 
    lines.foreachRDD(rdd => rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key"))) 
} 
    ssc.addStreamingListener(new StreamingListener { 
    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { 
    System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString + " ms") 
    } 
    override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { 
    println("inside onReceiverStarted") 
    } 
    override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { 
    println("inside onReceiverError") 
    } 
    override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { 
    println("inside onReceiverStopped") 
    } 
    override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { 
    println("inside onBatchSubmitted") 
    } 
    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { 
    println("inside onBatchStarted") 
    } 
}) 
    ssc.start() 
println("===========================") 
ssc.awaitTermination() 
} 
case class test(key: String) 

如果我把任何一个主题在同一时间,然后每个主题的作品。但是当主题列表有多个主题时,在获得kafka主题中的DataStream之后,它将继续打印“onBatchSubmitted”。处理muitsple kafka主题与单火花流上下文挂起batchSubmitted

回答

0

我的坏。我配置错了。 setMaster(“local [*]”)代替setMaster(“local [2]”)。

0

更改本地[2]地方[*]和它的工作正常。

val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")