2016-12-05 141 views
0

第一批: - 我想从100平面文件中拉出数据并加载到一个数组并将它们作为字节数组逐个插入到kafka生产者中。Golang Kafka没有消耗所有消息偏移西南

第二批: - 我正在消费卡夫卡消费者,然后将它们插入NoSQL数据库。

我在Shopify sarama golang包的Kafka配置文件中使用Offsetnewset。

我可以接收和插入消息给卡夫卡,但消费时我只能得到第一条消息。因为我在sarama配置中给了最新的Offset。 我怎样才能得到这里的所有数据。

回答

1

这是很难能够告诉无需任何代码如何卡夫卡配置一些或更深入的解释(即:主题,分区,...),所以很少快速检查来我的脑海:

  1. 假设你开始与OffsetNewest消耗设置你开始生产之前,有一两件事,也许发生的事情是,你是不是从该主题的所有分区的消耗,对于以萨拉马文档,你必须明确地由消费每个分区创建PartitionConsumers。从例子中https://godoc.org/github.com/Shopify/sarama#Consumer

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) 
    if err != nil { 
        panic(err) 
    } 
    
    ... 
    
    consumed := 0 
    ConsumerLoop: 
    for { 
        select { 
        case msg := <-partitionConsumer.Messages(): 
         log.Printf("Consumed message offset %d\n", msg.Offset) 
         consumed++ 
        case <-signals: 
         break ConsumerLoop 
        } 
    } 
    
  2. 你,其实,也开始消费产生的所有事件,因此,指针读他们一切都不OffsetNewest但OffsetOldest代替。

我很抱歉不能给你一个更加有用的答案,但也许如果你粘贴一些代码或提供更多的细节,我们可以帮助更。