2016-03-08 55 views
0

我用火花流接收这样的卡夫卡的数据:火花流无法启动多个线程

val conf = new SparkConf() 
conf.setMaster("local[*]").setAppName("KafkaStreamExample") 
    .setSparkHome("/home/kufu/spark/spark-1.5.2-bin-hadoop2.6") 
    .setExecutorEnv("spark.executor.extraClassPath","target/scala-2.11/sparkstreamexamples_2.11-1.0.jar") 

val threadNum = 3 

val ssc = new StreamingContext(conf, Seconds(2)) 
val topicMap = Map(consumeTopic -> 1) 

val dataRDDs:IndexedSeq[InputDStream[(String, String)]] = approachType match { 
    case KafkaStreamJob.ReceiverBasedApproach => 
    (1 to threadNum).map(_=> 
     KafkaUtils.createStream(ssc, zkOrBrokers, "testKafkaGroupId", topicMap)) 
    case KafkaStreamJob.DirectApproach => 
    (1 to threadNum).map(_=> 
     KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, Map[String, String]("metadata.broker.list" -> zkOrBrokers), 
     Set[String](consumeTopic))) 
} 

//dataRDDs.foreach(_.foreachRDD(genProcessing(approachType))) 
val dataRDD = ssc.union(dataRDDs) 
dataRDD.foreachRDD(genProcessing(approachType)) 

ssc.start() 
ssc.awaitTermination() 

的genProcessing生成一个进程来处理数据,这将需要5秒(睡眠5秒)。代码是这样的:

def eachRDDProcessing(rdd:RDD[(String, String)]):Unit = { 
    if(count>max) throw new Exception("Stop here") 
    println("--------- num: "+count+" ---------") 

    val batchNum = count 
    val curTime = System.currentTimeMillis() 

    Thread.sleep(5000) 

    val family = approachType match{ 
    case KafkaStreamJob.DirectApproach => KafkaStreamJob.DirectFamily 
    case KafkaStreamJob.ReceiverBasedApproach => KafkaStreamJob.NormalFamily 
    } 

    val families = KafkaStreamJob.DirectFamily :: KafkaStreamJob.NormalFamily :: Nil 

    val time = System.currentTimeMillis().toString 

    val messageCount = rdd.count() 

    rdd.foreach(tuple => { 
    val hBaseConn = new HBaseConnection(KafkaStreamJob.rawDataTable, 
     KafkaStreamJob.zookeeper, families) 
    hBaseConn.openOrCreateTable() 
    val puts = new java.util.ArrayList[Put]() 
    val strs = tuple._2.split(":") 
    val row = strs(1) + ":" + strs(0) + ":" + time 
    val put = new Put(Bytes.toBytes(row)) 
    put.add(Bytes.toBytes(family), Bytes.toBytes(KafkaStreamJob.tableQualifier), 
     Bytes.toBytes("batch " + batchNum.toString + ":" + strs(1))) 
    puts.add(put) 
    hBaseConn.puts(puts) 
    hBaseConn.close() 
    }) 

    count+=1 
    println("--------- add "+messageCount+" messages ---------") 
} 
eachRDDProcessing 

但火花流不启动多线程。我的机器有8个内核,火花在一个节点上运行。

回答

0

我不会引发流式处理会启动线程,特别是在驱动程序上。关键是如果你有多个节点,你的genProcessing将运行在不同的节点上。

此外,如果您调用rdd.foreachPartition(...),假设它应该得到更好的并行性