2017-09-23 77 views
2

与测试谷歌酒吧工作/次的客户端(v0.28.3)PubSub的Ack'd消息不断重新提供每10s(Python的公测客户端)

有没有人看到这样一个场景,相同的消息是不断每10秒重新发放一次,即使在acking之后呢?

这超出了Pub/Sub的至少一次性质。它偶尔发生,但是当它发生时,我们会连续几个小时看到相同的信息。

嫌疑人这是因为我们处理来自用户的后台线程中的传入消息;但尚未能始终如一地重现它。由于某种原因,这不是犹太教吗?

如果有错误,很乐意提交,但假设我们做错了。有没有人处理过类似的问题?


随着调试日志,我们看到:

D 13:51:46.000 Received response: received_messages { ... message_id: "155264162517414" ... } 
D 13:51:46.000 New message received from Pub/Sub: %r 
I 13:51:46.000 Processing Message: 155264162517414 
I 13:51:48.000 Acking Message: 155264162517414 
D 13:51:48.000 Sending request: ack_ids: "LDR..." 
D 13:51:50.000 Snoozing lease management for 4.009431 seconds. 
D 13:51:50.000 Renewing lease for 0 ack IDs. 
D 13:51:50.000 The current p99 value is 10 seconds. 
... 
D 13:51:59.000 Received response: received_messages { ... message_id: "155264162517414" ... } 
D 13:51:59.000 New message received from Pub/Sub: %r 
I 13:51:59.000 Processing Message: 155264162517414 

这里是代码的玩具版本,展示了我们如何线程,这有时会触发本地运行的问题:

import Queue 
import logging 
import threading 
import random 
import time 
from google.cloud import pubsub 


SUBSCRIPTION_PATH = ... 


class Worker(threading.Thread): 
    """Background thread to consume incoming messages.""" 
    def __init__(self, name): 
     threading.Thread.__init__(self, name=name) 
     self.queue = Queue.Queue() 

    def run(self): 
     while True: 
      message = self.queue.get() 
      self.process(message) 
      print '<< Acking :', message.message_id 
      message.ack() 
      self.queue.task_done() 

    def process(self, message): 
     """Fake some work by sleeping for 0-15s. """ 
     s = random.randint(0, 15) 
     print '>> Worker sleeping for ', s, message.message_id 
     for i in range(s): 
      time.sleep(1) 
      print i 


class Subscriber(threading.Thread): 
    """Handles the subscription to pubsub.""" 
    def __init__(self): 
     threading.Thread.__init__(self, name='Subscriber') 
     self.subscriber = pubsub.SubscriberClient() 
     self.worker = Worker('FakeWorker') 
     self.worker.daemon = True 

    def run(self): 
     self.worker.start() 

     flow_control = pubsub.types.FlowControl(max_messages=10) 
     policy = self.subscriber.subscribe(SUBSCRIPTION_PATH, 
      flow_control=flow_control, 
      callback=self._consume) 
     print 'Sub started, thread', threading.current_thread() 

    def _consume(self, message): 
     self.worker.queue.put(message) 


if __name__ == '__main__': 
    subscriber = Subscriber() 
    subscriber.start() 
    while 1: 
     pass 

谢谢!

回答

0

除Pub/Sub的至少一次性质外,Pub/Sub中的acks是尽力而为的。这意味着有两种潜在的方法可以使ack“搞砸”。

  1. 该消息可以被pub/sub成功acked,并重新发送一次(可能是由于竞争条件)。
  2. 消息可能无法成功确认。

在第二种情况获得的情况下,客户端库不会给你任何错误(因为客户端库本身没有给出),并且你将开始看到消息的节奏(和如果你的工艺时间很短,这将会是10秒钟)。

解决此问题的方法是在收到邮件时再次简单回复邮件。我假设(从玩具代码中不清楚,所以我猜测),你只是无视重复的消息,但如果你重复了,你应该停止获得它。

如果你重新acking的消息,那么请open an issue针对客户端库。

+0

感谢卢克的回应。我相信我们一直在重复这些重复的信息。 Unf,尽最大努力确保重新传输的意义在于,我们必须保持集群范围内处理消息的缓存,这并不是很理想。现在我们切换到Google的prod基于HTTP的库;并提交了支持票。 – Greg

相关问题