2016-07-31 157 views
0

我是卡夫卡新手。我用两个动物园饲养员实例和两个经纪人实例测试卡夫卡。我创建了一个测试主题“topicA”。以下是我的话题的描述。卡夫卡消费者没有消费数据

Topic:topicA PartitionCount:1  ReplicationFactor:1  Configs: 
     Topic: topicA Partition: 0 Leader: 2  Replicas: 2  Isr: 2 

主题在卡夫卡经纪人-2中有一个分支,在同一个经纪人中只有一个副本。 我正在使用Kafka生产者(org.apache.kafka.kafka-clients.0.9.0.1)将消息发送给代理。

制片配置:

props.put("bootstrap.servers", "***:12900"); // this is kafka broker url 
props.put("block.on.buffer.full", "true"); 
props.put("request.required.acks", "1"); 
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("partition.assignment.strategy", "range"); 

我送生产的10K消息。

kafkaProducer.send(new ProducerRecord<String, String>(
        topic,"partitionName", 
        String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); 

System.out.println("Sent Message - " + i + " Successfully"); 

但是我不能在我的消费者中得到任何消息。

while (true) { 
       ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); 
       for (ConsumerRecord<String, String> record : records) { 
        System.out.println(record.offset() + "---->" + record.value()); 
       } 
      } 

我consmer道具:

bootstrap.servers = *:12900 // this is my kafka broker 
group.id = test 
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
enable.auto.commit=true 
# fast session timeout makes it more fun to play with failover 
session.timeout.ms=10000 

# These buffer sizes seem to be needed to avoid consumer switching to 
# a mode where it processes one bufferful every 5 seconds with multiple 
# timeouts along the way. No idea why this happens. 
fetch.min.bytes=50000 
receive.buffer.bytes=262144 
max.partition.fetch.bytes=2097152 

错误经纪人1:BufferUnderFlowException被重复很多次。

[Controller-1-to-broker-1-send-thread], Controller 1 epoch 6 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:1;ControllerEpoch:6;CorrelationId:10;ClientId:id_1-host_null-port_12900;Leaders:id:1,host:*,port:12900,id:2,host:*,port:12900;PartitionState:(__consumer_offsets,32) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,16) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,49) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,44) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,28) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,17) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,23) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,7) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,4) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,29) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,35) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,3) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,24) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,41) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,0) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,38) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,13) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,8) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,5) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,39) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,36) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,40) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,45) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,15) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,33) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,37) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,21) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,6) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,11) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,20) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,47) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,2) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,27) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,34) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,9) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,22) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,42) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,14) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,25) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,10) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,48) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,31) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,18) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,19) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,12) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,46) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,43) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,1) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,26) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,30) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2) to broker id:1,host:*,port:12900. Reconnecting to broker. 
java.io.IOException: Broken pipe 
kafka-request-handler-0]: [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [__consumer_offsets,32],[__consumer_offsets,16],[__consumer_offsets,44],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_offsets,24],[__consumer_offsets,0],[__consumer_offsets,38],[__consumer_offsets,8],[__consumer_offsets,36],[__consumer_offsets,40],[__consumer_offsets,6],[__consumer_offsets,20],[__consumer_offsets,2],[__consumer_offsets,34],[__consumer_offsets,22],[__consumer_offsets,42],[__consumer_offsets,14],[__consumer_offsets,10],[__consumer_offsets,48],[__consumer_offsets,18],[__consumer_offsets,12],[__consumer_offsets,46],[__consumer_offsets,26],[__consumer_offsets,30] 
2016-07-31 06:48:11,045 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-0 with log end offset 0 
2016-07-31 06:48:11,054 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,0] in log/kafka_1 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:11,058 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,0] on broker 1: No checkpointed highwatermark is found for partition [__consumer_offsets,0] 
2016-07-31 06:48:11,069 [INFO ][kafka-scheduler-4]: Loading offsets from [__consumer_offsets,0] 
2016-07-31 06:48:11,072 [INFO ][kafka-scheduler-4]: Finished loading offsets from [__consumer_offsets,0] in 3 milliseconds. 
2016-07-31 06:59:31,945 [ERROR][kafka-network-thread-12900-2]: Closing socket for /host because of error 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66) 
    at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85) 
    at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29) 
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50) 
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50) 
    at kafka.network.RequestChannel$Request.(RequestChannel.scala:50) 
    at kafka.network.Processor.read(SocketServer.scala:450) 
    at kafka.network.Processor.run(SocketServer.scala:340) 

登录经纪人2(有一个在经纪人没有错误)

2016-07-31 06:48:10,972 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [__consumer_offsets,49],[__consumer_offsets,17],[__consumer_offsets,23],[__consumer_offsets,7],[__consumer_offsets,29],[__consumer_offsets,35],[__consumer_offsets,3],[__consumer_offsets,41],[__consumer_offsets,13],[__consumer_offsets,5],[__consumer_offsets,39],[__consumer_offsets,45],[__consumer_offsets,15],[__consumer_offsets,33],[__consumer_offsets,37],[__consumer_offsets,21],[__consumer_offsets,11],[__consumer_offsets,47],[__consumer_offsets,27],[__consumer_offsets,9],[__consumer_offsets,25],[__consumer_offsets,31],[__consumer_offsets,19],[__consumer_offsets,43],[__consumer_offsets,1] 
2016-07-31 06:48:10,990 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-29 with log end offset 0 
2016-07-31 06:48:10,994 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,29] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:10,996 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,29] on broker 2: No checkpointed highwatermark is found for partition [__consumer_offsets,29] 
2016-07-31 06:48:10,998 [INFO ][kafka-scheduler-5]: Loading offsets from [__consumer_offsets,29] 
2016-07-31 06:48:11,011 [INFO ][kafka-scheduler-5]: Finished loading offsets from [__consumer_offsets,29] in 13 milliseconds. 
2016-07-31 06:48:11,023 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-45 with log end offset 0 
2016-07-31 06:48:11,025 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,45] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:11,913 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([ 

1)请让我知道为什么我的消费者是无法得到的消息? 2)我的生产者和消费者配置看起来好吗? 我的消费者/生产者应该连接到动物园饲养员,而不是直接连接到经纪人? 3)时代在控制器中意味着什么? 4)以下警告意味着什么? 没有找到检查点的高水印分区

+0

顺便说一句:Zookeeper需要奇数个服务器才能正常工作! –

回答

1

您显然正在使用比客户端版本更早的Kafka代理版本。请仔细检查您的卡夫卡经纪人和消费者版本。

您的错误表示无法处理来自消费者的JoinGroupRequest。很可能,这意味着您的客户正在发送经纪人无法理解的JoinGroupRequest版本。

通常,您的Kafka代理商的版本应该等于或高于您用来避免此类错误的客户端的版本。

您的解决方案应该是升级您的Kafka经纪商或降级您使用的客户端。

你CONFIGS看起来大多是好,除了这一个:

props.put("partition.assignment.strategy", "range"); 

这是没用的,因为这涉及一个老消费者,最有可能是由您的制片人忽略,所以你可以放心地将其删除。

时元类似于群集状态的版本或世代ID。这允许普通代理与控制器进行适当的状态同步。