我的印象是,在开启同步的两个经纪商的情况下,即使一家经纪商失败,我的卡夫卡设置也应该继续工作。为什么卡夫卡不能继续在一个经纪人的失败中工作?
为了测试它,我做了一个名为topicname的新主题。它的描述如下:
Topic:topicname PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topicname Partition: 0 Leader: 0 Replicas: 0 Isr: 0
然后我跑了以下列方式producer.sh和consumer.sh:
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9095 sync --topic topicname
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicname --from-beginning
直到双方经纪正在努力,我看到了正在通过正确接收的消息消费者,但是当我通过kill
命令杀死了经纪人的一个实例时,消费者停止向我展示任何新消息。相反,它向我显示了以下错误消息:
WARN [ConsumerFetcherThread-console-consumer-57116_ip-<internalipvalue>-1438604886831-603de65b-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 865; ClientId: console-consumer-57116; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [topicname,0] -> PartitionFetchInfo(9,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread)
[2015-08-03 12:29:36,341] WARN Fetching topic metadata with correlation id 1 for topics [Set(topicname)] from broker [id:0,host:<hostname>,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
您将复制因子设置为1,这意味着您的数据将保存在一个节点上。要复制数据集复制因子2(不能将其设置为高于可用集群中的代理数)。然后再试一次 – serejja
@serejja我试着将复制设置为2.当我杀了一个经纪人时,我得到了以下错误:kafka.common.FailedToSendMessageException:3次尝试后未能发送消息。' –