0

我想并行读取Kafka消息,从而并行处理它们。我的卡夫卡话题有10个分区。我试图创建5个DStream并应用Union方法来操作单个DStream。这里是我试过至今代码:在Kafka Spark流中读取并处理并行性

def main(args: scala.Array[String]): Unit = { 

    val properties = readProperties() 

    val streamConf = new SparkConf().setMaster("local[2]").setAppName("KafkaStream") 
    val ssc = new StreamingContext(streamConf, Seconds(1)) 
    // println("defaultParallelism: "+ssc.sparkContext.defaultParallelism) 
    ssc.sparkContext.setLogLevel("WARN") 
    val numPartitionsOfInputTopic = 5 
    val group_id = Random.alphanumeric.take(4).mkString("consumer_group") 
    val kafkaStream = { 
     val kafkaParams = Map("zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"), 
          "group.id" -> group_id, 
          "zookeeper.connection.timeout.ms" -> "3000")      

     val streams = (1 to numPartitionsOfInputTopic).map { _ => 
          KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
          ssc, kafkaParams, Map("kafka_topic" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) 
     } 
    val unifiedStream = ssc.union(streams) 
    val sparkProcessingParallelism = 5 
    unifiedStream.repartition(sparkProcessingParallelism) 
    } 

    kafkaStream.foreachRDD { x => 
    x.foreach {  
     msg => println("Message: "+msg) 
     processMessage(msg) 
    }  
    } 

    ssc.start() 
    ssc.awaitTermination() 
} 

在执行时,它甚至没有收到一个消息,更不用说进一步处理它。我在这里错过了什么吗?如果需要,请建议进行更改。谢谢。

回答

0

我强烈建议切换到直接流。为什么?

Direct Stream默认将并行度设置为您在Kafka中分区的数量。没有什么必须做更多工作 - 只是创建直接流如果您创建5个DStreams做你的工作:)

,默认情况下将在5线程读取,一个非直接DSTREAM =一个线程

+0

Gaweda嗨,谢谢你提出一个替代方案。我只是尝试直接流方法。尽管如此,这些消息依次得到处理。处理时间实际上不到一秒钟。为了测试它,我在消息处理函数中加入了'Thread.sleep(10000)'。这里是我正在形成的KafkaParams的地图: 'val kafkaParams = Map(“metadata.broker.list” - > localhost:9092, “group.id” - >“dsdc”, “auto.offset。重置“ - >”最大“)' 我需要改变一些东西吗? – Arjun