2016-11-16 332 views
2

既然Golang Kafka库(sarama)提供的消费者组功能没有任何外部库的帮助,那么kafka 10.我如何获得消费者正在处理的当前消息偏移量在任何特定时间组?如何获取Golang Kafka中的分区的消费者组偏移量10

此前我使用kazoo-go(https://github.com/wvanbergen/kazoo-go)来获取我的消费者群消息的偏移量,因为它存储在Zookeeper中。现在我使用sarama-cluster(https://github.com/bsm/sarama-cluster),我不确定使用哪个API来获取消费者组消息抵消。

回答

1

我也在与Sarama和Kafka合作来获得一个主题的抵消。

您可以使用以下代码获得偏移量。

package main 

    import (
    "gopkg.in/Shopify/sarama" 
    "fmt" 
    ) 

    func main(){ 
     client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration 
     if err != nil { 
      panic(err) 
     } 
     lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest) 
     if err != nil { 
      panic(err) 
     } 
     fmt.Println("Last Commited Offset ",lastoffset) 
    } 

让我知道这是你要找的答案,如果它是有帮助的。

+0

This Works。我认为我的Kafka集群端出现了问题,导致此API调用没有返回任何数据。现在工作。 – tazo

+0

这是否会返回分区产生的最后一条消息的偏移量,或消费者为分区标记并提交的最后一条消息的偏移量? –

+2

@LorenzoBelli该代码返回分区产生的最新偏移量,而不是消费者提交的偏移量 –

相关问题