val props = new Properties()
props.put("bootstrap.servers", "foo:9092,bar:9092")
props.put("acks", "all")
props.put("retries", 1)
props.put("batch.size", 10000)
props.put("linger.ms", 5)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
try {
producer.send(new ProducerRecord[String, String](topic, key, msg.toJson)).get()
true
} catch {
case ex: Throwable => {
println(ex)
false
}
}
此代码抛出一个异常失败后48毫秒
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 48 ms.\n\tat
org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)\n\tat
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)\n\tat
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)\
注意,我张贴对生产集群数据,因此它的启动和运行,很多应用已经成功地发布消息到主题更新元数据。它只是我的代码没有发布。
我注意到max.block.ms被设置为48 ms,这是一个非常低的值。为什么你设置这样的值来检索元数据? – amethystic
@amethystic这不是在给定的配置中,你从哪里得到的? – annedroiid
从“48ms后无法更新元数据”。 – amethystic