2016-04-05 176 views
1

我正在尝试在Go中编写RabbitMQ Consumer。假设从队列中一次取5个对象并处理它们。此外,假设成功处理其他人发送到死信队列5次然后丢弃,它应该无限运行并处理消费者的取消事件。 我有几个问题:Go中的RabbitMQ消费者

  1. 是否存在的BasicConsumer VS在EventingBasicConsumer任何概念的RabbitMQ,去Reference
  2. RabbitMQ中的Model是什么?它在RabbitMq-go中吗?
  3. 如何时,没有死信队列后再次ttl
  4. 重新排队他们什么是consumerTag说法在ch.Consume功能在下面的代码的意义
  5. 我们应该用发送对象channel.Get()channel.Consume()在这种情况下?

为了满足上述要求,我需要在下面的代码中做出什么样的改变。我问这是因为我找不到像样的RabbitMq-Go文档。

func main() { 

     consumer()   
    } 

    func consumer() { 

     objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}  
     initializeConn(&objConsumerConn.conn) 


     ch, err := objConsumerConn.conn.Channel() 
     failOnError(err, "Failed to open a channel") 
     defer ch.Close() 

     msgs, err := ch.Consume(
       objConsumerConn.queueName, // queue 
       "demo1",  // consumerTag 
       false, // auto-ack 
       false, // exclusive 
       false, // no-local 
       false, // no-wait 
       nil, // args 
     ) 
     failOnError(err, "Failed to register a consumer") 

     forever := make(chan bool) 

     go func() { 
      for d := range msgs {     
       k := new(EventCaptureData) 
       b := bytes.Buffer{} 
       b.Write(d.Body) 
       dec := gob.NewDecoder(&b) 
       err := dec.Decode(&k) 
       d.Ack(true) 

       if err != nil { fmt.Println("failed to fetch the data from consumer", err); } 
        fmt.Println(k)       
      } 
     }()  

     log.Printf(" Waiting for Messages to process. To exit press CTRL+C ") 
     <-forever 

    } 

被修改的问题:

我已延迟的消息的处理如在链接link1link2建议。但问题是即使在ttl之后,消息也会从死信队列中恢复到原始队列。我正在使用RabbitMQ 3.0.0。任何人都可以指出什么是问题?

+0

尝试AMQP包与兔子互动,也有一个非常体面的文件https://godoc.org/github.com/streadway/amqp – PerroVerd

+0

@PerroVerd这是我在用。 – Naresh

回答

1

在 RabbitMq-go Reference中是否有BasicConsumer vs EventingBasicConsumer的概念?

不完全是,但Channel.GetChannel.Consume调用服务类似的概念。使用Channel.Get时,如果有任何可用信息,您将获得第一条消息的非阻塞呼叫,或返回ok=false。通过Channel.Consume排队的消息被传递到一个通道。

什么是RabbitMQ中的模型,它在RabbitMq-go中有吗?

如果你指的是在C#中的RabbitMQ的IModelConnection.CreateModel,这是从C#lib中的东西,而不是从自己的RabbitMQ。这只是一个试图从RabbitMQ“Channel”术语中抽象出来的东西,但它从来没有出现过。

如何时,没有死信队列和TTL后再次 重新排队它们

使用delivery.Nackrequeue=false发送的对象。

消费者标签参数在ch中的意义是什么?在下面的代码中使用 函数

ConsumerTag只是一个消费者标识符。它可用于取消频道channel.Cancel,并识别负责传送的消费者。与channel.Consume一起发送的所有消息都将设置为ConsumerTag字段。

我们应该在这种情况下使用channel.Get()channel.Consume()吗?

我认为channel.Get()几乎从来没有比channel.Consume()好。有了channel.Get,你会轮询队列并浪费CPU什么也不做,这在Go中没有意义。

我需要在下面的代码中做出什么修改才能满足 以上的要求。

  1. 既然你在一个时间是批量处理5,你可以从消费渠道接收,一旦它得到了5个交货调用另一个函数来处理他们的goroutine。

  2. 要确认或发送到死信队列,您将使用delivery.Ackdelivery.Nack函数。您可以使用multiple=true并为批次调用一次。一旦消息进入死信队列,您必须检查delivery.Headers["x-death"]标题,了解其已死信的次数,并在已经重试5次时呼叫delivery.Reject

  3. 使用channel.NotifyCancel来处理取消事件。

+0

非常感谢你的详细解释。 – Naresh

+0

@佩德罗..我有一个问题。如果我使用d.Ack(true)或d.Ack(false),它不会将消息发布在死信队列中。在d.Nack(true,false)的情况下发布。但是在ttl之后它会从那里丢弃消息。那么,实现相同 – Naresh

+0

我编辑过的问题的价值是什么。你可以看看它吗 – Naresh