2016-08-03 65 views
7

我有一个Azure WebJob项目,我在本地开发机器上运行。它正在侦听Azure服务总线消息队列。没有什么像主题一样,只是最基本的消息队列。如何防止Azure webjob多次同时处理相同的消息

它正在多次接收/处理相同的消息,当收到消息时立即启动两次,然后在消息正在处理的同时间歇性地启动。

问题:

  • 为什么我收到即时相同的消息多次?它似乎是在PeekLock应用之前重新获取?
  • 即使邮件仍在处理中,邮件又是如何被重新收到?我可以设置PeekLock持续时间,或者以某种方式将消息锁定为仅处理一次
  • 如何确保队列上的每条消息只处理一次?
  • 我希望能够一次处理多个消息,多次不是同一个消息,因此将MaxConcurrentCalls设置为1似乎不是我的答案,还是我误解了该属性?

我使用的是异步功能,简单的注射器和一个自定义JobActivator,所以不是一个静态无效的方法,我的函数签名是:

public async Task ProcessQueueMessage([ServiceBusTrigger("AnyQueue")] MediaEncoderQueueItem message, TextWriter log) {...} 

的工作里面,它是围绕移动一些文件在blob服务上,以及从媒体服务呼叫(并等待)媒体编码器。所以,虽然网络作业本身并没有做很多处理,但需要相当长的时间(15分钟,对于某些文件)。

该应用程序正在启动,当我将消息发送到队列时,它会作出响应。但是,一旦收到消息接收的消息多次:

Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.' 
Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.' 

此外,虽然正在运行的任务(我看到来自媒体服务的功能输出),它会得到另一个“复制”从队列中。

最终在任务完成后,它仍然间歇地处理相同的消息。

+1

什么是在队列中指定的DeliveryCount和LockDuration? –

+0

我需要看看它。关于LockDuration的奇怪之处在于,它似乎几乎立即收到了两条消息,好像它并没有锁定它。我想知道这是否与我设置异步处理程序的方式有关? – AndrewP

+0

听起来有点偏离。如果LockDuration没有过期,代理将永远不会给同一消费者提供相同的消息。我怀疑还有其他事情正在发生。你有可能在GitHub或BitBucket上共享repro? –

回答

7

我怀疑发生了什么是以下几点: 最大DurationLock可能是5分钟。如果在5分钟内处理完消息,则消息被标记为已完成并从代理中删除。否则,如果处理时间超过5分钟(我们失去了对消息的锁定),消息将重新出现,并且会再次消耗。您可以通过查看消息的DeliveryCount来验证。

要解决该问题,可以在即将过期前使用BrokeredMessage.RenewLockAsync()续订消息锁定。

+0

我会检查出这种方法,并会查看交付计数和持续时间锁定。奇怪的是,它并不是每一次都在发生,而且它发生的时间似乎并没有太大的一致性,而且它似乎也立即处理了两次相同的信息。但是,多谢指出这个功能! – AndrewP

+0

@AndrewP如果你的工作抛出一个未处理的异常,并且在服务总线配置中有自动完成消息为真,那么它也可能是自动重试的情况。您可以通过简单地在您的触发器函数中抛出一个ex来测试,然后向该主题发布一条消息 - 您应该看到一个消息检测,直到最大重试次数。 – JoeBrockhaus

+0

默认情况下锁不是自动更新的吗?所以5分钟后,如果处理仍在进行,锁将自动更新。或者我错过了什么? – GETah