2017-09-12 19 views
0

我正在开发一个使用Kafka作为消息发布/子工具的系统。Kafka(Re-)加入小组坚持超过2个主题

数据由斯卡拉脚本生成的:

val kafkaParams = new Properties() 
    kafkaParams.put("bootstrap.servers", "localhost:9092") 
    kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    kafkaParams.put("group.id", "test_luca") 

    //kafka producer 
    val producer = new KafkaProducer[String, String](kafkaParams) 

    //Source list 
    val s1 = new java.util.Timer() 
    val tasks1 = new java.util.TimerTask { 
     def run() = { 
      val date = new java.util.Date 
      val date2 = date.getTime() 
      val send = ""+ date2 + ", 45.1234, 12.5432, 4.5, 3.0" 
      val data = new ProducerRecord[String,String]("topic_s1", send) 
      producer.send(data) 
     } 
    } 
    s1.schedule(tasks1, 1000L, 1000L) 

    val s2 = new java.util.Timer() 
    val tasks2 = new java.util.TimerTask { 
     def run() = { 
      val date = new java.util.Date 
      val date2 = date.getTime() 
      val send = ""+ date2 + ", 1.111, 9.999, 10.4, 10.0" 
      val data = new ProducerRecord[String,String]("topic_s2", send) 
      producer.send(data) 
     } 
    } 
    s2.schedule(tasks2, 2000L, 2000L) 

我需要测试在某些特定情况下的卡夫卡表演。在其中一种情况下,我有一个使用主题为“topic_s1”和“topic_s2”的数据的其他脚本,对它们进行详细说明,然后生成包含不同主题(topic_s1b和topic_s2b)的新数据。随后,这些详细的数据被Apache Spark Streaming脚本使用。

如果我省略消费者/生产者脚本(我只有一个拥有2个主题和Spark脚本的卡夫卡制作人),一切正常。

如果我使用完整配置(包含2个主题的1个kafka生产者,使用来自kafka生产者的数据的“中间件”脚本,详细说明它们并用新主题生成新数据,1个使用新主题消费数据的spark脚本)Spark Streaming脚本卡住了INFO AbstractCoordinator: (Re-)joining group test_luca

我在本地运行所有内容,并且不对kafka和zookeeper配置进行修改。

有什么建议吗?

UPDATE:火花脚本:

val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]") 
val sc = new SparkContext(sparkConf) 

val ssc = new StreamingContext(sc, Seconds(4)) 

case class Thema(name: String, metadata: JObject) 
case class Tempo(unit: String, count: Int, metadata: JObject) 
case class Spatio(unit: String, metadata: JObject) 
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema) 
case class Location(latitude: Double, longitude: Double, name: String) 

case class Data(location: Location, timestamp: Long, measurement: Int, unit: String, accuracy: Double) 
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt) 


case class Datas(location: Location, timestamp: Long, measurement: Int, unit: String, accuracy: Double) 
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt) 


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName, 
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName, 
    "group.id" -> "test_luca", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val topics1 = Array("topics1") 
val topics2 = Array("topics2") 

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)) 
val stream2 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics2, kafkaParams)) 

val s1 = stream.map(record => { 
    implicit val formats = DefaultFormats 
    parse(record.value).extract[Sensor] 
} 
) 
val s2 = stream2.map(record => { 
    implicit val formats = DefaultFormats 
    parse(record.value).extract[Sensor2] 
} 
) 

val f1 = s1.map { x => x.sensor_name } 
f1.print() 
val f2 = s2.map { x => x.sensor_name } 
f2.print() 

感谢 卢卡

+0

请显示您的火花流脚本代码。 – GuangshengZuo

+0

@广生I我已经上传了Spark Script –

回答

1

也许你应该改变火花流脚本的group.id。我猜你的“中间件”脚本的使用者与你的spark流脚本的使用者有相同的group.id。那么可怕的事情就会发生。

在kafka中,消费者群体是话题的真正订阅者,群组中的消费者只是一个拆分工作者,所以对于您的情况,您应该在中间件脚本消费者和spark流脚本消费者中使用不同的group.id。

在你没有中间脚本的第一次尝试中,它只是因为这个。