2016-05-27 442 views
0

我试图在一个名为“test”的kafka主题上发送一个字数问题(在spark-scala中)的输出。看到下面的代码:Kafka producer.send()被producer.close()停止

val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

val lines = Dstream.map(f => f._2) 
val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 

wordCounts.foreachRDD(
     rdd => rdd.foreach(
     f => 
      { 
      val sendProps = new Properties() 
      sendProps.put("metadata.broker.list", brokers) 
      sendProps.put("serializer.class", "kafka.serializer.StringEncoder") 
      sendProps.put("producer.type", "async") 

      val config = new ProducerConfig(sendProps) 
      val producer = new Producer[String, String](config) 
      producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2)) 
      producer.close(); 

      })) 

问题是一些单词随机丢失输出。我还注意到,如果我删除了语句

producer.close() 

没有数据丢失。

这是否意味着producer.close()中断producer.send()之前它其实是把数据缓冲区,由于其特定的元组没有被发送到消费者?如果是,我应该如何关闭生产者而不会冒数据丢失的风险?

以上是我最初的问题,Vale的答案解决了这个问题。

现在,当我更改producer.type属性 - 数据随机丢失。

sendProps.put("producer.type", "sync") 

为了澄清producer.send运行所有我需要把在输出主题的话。但是,有些单词会丢失,并且不会显示在输出Kafka主题中。

回答

1

这很奇怪。 close()方法应该等待send完成,这就是为什么引入close(time)方法的原因:as you can see here
因此,我使用Java 7. rdd.foreach是否在其中的每个分区上运行?或者它在每个元组上运行(我认为它正在执行)?
如果后者,你可以尝试一个rdd.foreachPartition(refer to this)?因为你正在为每一行创建一个制作人,而且我担心这可能会导致问题(尽管理论上它不应该)。

+0

我实际上是从一个kafka流中读取数据。是的,每个元组都会创建一个新的制作人。请在看到此内容后告诉我该做什么。 –

+1

我知道你在使用DStreams。我还不能用scala编写。如果你看看我的这个链接有一个“如何使用foreachRDD”,这表明你这种结构: 'dstream.foreachRDD | -rdd.foreachPartition | - 创建新的生产者,从分区获取的东西和发送它' – Vale

+0

谢谢淡水河谷。该解决方案工作得很好。我一直在敲我的头脑超过一天。你救了我的灵魂。 –