我正在尝试在Go中编写RabbitMQ Consumer。假设从队列中一次取5个对象并处理它们。此外,假设成功处理其他人发送到死信队列5次然后丢弃,它应该无限运行并处理消费者的取消事件。 我有几个问题:Go中的RabbitMQ消费者
- 是否存在的
BasicConsumer
VS在EventingBasicConsumer
任何概念的RabbitMQ,去Reference? - RabbitMQ中的
Model
是什么?它在RabbitMq-go中吗? - 如何时,没有死信队列后再次
ttl
- 重新排队他们什么是
consumerTag
说法在ch.Consume
功能在下面的代码的意义 - 我们应该用发送对象
channel.Get()
或channel.Consume()
在这种情况下?
为了满足上述要求,我需要在下面的代码中做出什么样的改变。我问这是因为我找不到像样的RabbitMq-Go文档。
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
被修改的问题:
我已延迟的消息的处理如在链接link1link2建议。但问题是即使在ttl之后,消息也会从死信队列中恢复到原始队列。我正在使用RabbitMQ 3.0.0
。任何人都可以指出什么是问题?
尝试AMQP包与兔子互动,也有一个非常体面的文件https://godoc.org/github.com/streadway/amqp – PerroVerd
@PerroVerd这是我在用。 – Naresh