2016-03-15 96 views
3

我正在使用Kafka 0.8.2,并且在我的客户中出现错误,提示“offset commit failed with ...”。当看着主题“__consumer_offsets”。我看到它有50个分区数。这是正常的吗?我只能通过删除所有Kafka日志并重新启动我的Kafka服务器来解决此问题。有一种方法可以在达到特定数量的分区时删除此主题,或者我承诺我的偏移量错误吗?kafka __consumer_offsets主题有过多的分区计数

这是我如何将我的补偿:

public void commitOffsets(BlockingChannel channel, String topic, String groupid, int partition, String clientName, int corrilationid, long offset) throws Exception{ 

    if (commitTryCount > 100){ 
     throw new Exception("Offset commit failed with " + channel.host()); 
    } 

    long now = System.currentTimeMillis(); 
    Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); 
    //for (int i = 0; i < this.totalPartitions; i++){ 
     TopicAndPartition topicPartition = new TopicAndPartition(topic, partition); 
     offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now)); 
    //}  

    //initialize offset commit 
    OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1); 
    channel.send(commitRequest.underlying()); 
    OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); 
    if (commitResponse.hasError()){   
     for (Object partitionErrorCode: commitResponse.errors().values()){ 
      if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){ 
       //reduce the size of the metadata and retry 
       offset--; 
       commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset); 
       commitTryCount++; 
      } else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode() 
        || Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { 
       //discover new coordinator and retry 
       int newCorrilation = corrilationid; 
       newCorrilation++; 
       this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation); 
       commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset); 
       commitTryCount++; 
      } else{ 
       //retry 
       commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset); 
       commitTryCount++; 
      }//end of else    
     }//end of for 
    }//end of if 
}//end of method 

回答

1

我想通了之后,我贴我的代码。当提交成功时,我忘了将变量“commitTryCount”设置为0。我仍然想知道__consumer_offsets话题有50个分区是否正常?

+0

50是'offsetsets.topic.num.partitions' config的默认值,所以没关系。你可以看看默认[这里](http://kafka.apache.org/documentation.html) – serejja

0

是的,消费者偏移的50个分区是默认值。要更改,请设置offsets.topic.num.partitions属性。

相关问题