2016-11-12 59 views
2
val props = new Properties() 
props.put("bootstrap.servers", "foo:9092,bar:9092") 
props.put("acks", "all") 
props.put("retries", 1) 
props.put("batch.size", 10000) 
props.put("linger.ms", 5) 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 

try { 
    producer.send(new ProducerRecord[String, String](topic, key, msg.toJson)).get() 
    true 
} catch { 
    case ex: Throwable => { 
     println(ex) 
     false 
    } 
} 

此代码抛出一个异常失败后48毫秒

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 48 ms.\n\tat 
org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)\n\tat 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)\n\tat 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)\ 

注意,我张贴对生产集群数据,因此它的启动和运行,很多应用已经成功地发布消息到主题更新元数据。它只是我的代码没有发布。

+0

我注意到max.block.ms被设置为48 ms,这是一个非常低的值。为什么你设置这样的值来检索元数据? – amethystic

+0

@amethystic这不是在给定的配置中,你从哪里得到的? – annedroiid

+0

从“48ms后无法更新元数据”。 – amethystic

回答

1

看起来主机名或端口有问题,因为您无法连接生产者和TimeoutException。你有没有尝试使用相同的生产者配置运行kafka-console-producer.sh?它真的在那台机器上工作吗?也许,与代理的连接受SSL或SASL保护。不要忘记打开config/tools-log4j.properties中的TRACE日志记录,这将帮助您调试您的问题。

0

我之前收到了相同的超时异常,当我试图使用控制台生产者发送消息时,我得到了WARN Error while fetching metadata with correlation id 656 : {metadata-1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

对于我自己来说,停止并重新启动经纪人让所有的事情都能够重新开始。