与测试谷歌酒吧工作/次的客户端(v0.28.3)PubSub的Ack'd消息不断重新提供每10s(Python的公测客户端)
有没有人看到这样一个场景,相同的消息是不断每10秒重新发放一次,即使在acking之后呢?
这超出了Pub/Sub的至少一次性质。它偶尔发生,但是当它发生时,我们会连续几个小时看到相同的信息。
我嫌疑人这是因为我们处理来自用户的后台线程中的传入消息;但尚未能始终如一地重现它。由于某种原因,这不是犹太教吗?
如果有错误,很乐意提交,但假设我们做错了。有没有人处理过类似的问题?
随着调试日志,我们看到:
D 13:51:46.000 Received response: received_messages { ... message_id: "155264162517414" ... }
D 13:51:46.000 New message received from Pub/Sub: %r
I 13:51:46.000 Processing Message: 155264162517414
I 13:51:48.000 Acking Message: 155264162517414
D 13:51:48.000 Sending request: ack_ids: "LDR..."
D 13:51:50.000 Snoozing lease management for 4.009431 seconds.
D 13:51:50.000 Renewing lease for 0 ack IDs.
D 13:51:50.000 The current p99 value is 10 seconds.
...
D 13:51:59.000 Received response: received_messages { ... message_id: "155264162517414" ... }
D 13:51:59.000 New message received from Pub/Sub: %r
I 13:51:59.000 Processing Message: 155264162517414
这里是代码的玩具版本,展示了我们如何线程,这有时会触发本地运行的问题:
import Queue
import logging
import threading
import random
import time
from google.cloud import pubsub
SUBSCRIPTION_PATH = ...
class Worker(threading.Thread):
"""Background thread to consume incoming messages."""
def __init__(self, name):
threading.Thread.__init__(self, name=name)
self.queue = Queue.Queue()
def run(self):
while True:
message = self.queue.get()
self.process(message)
print '<< Acking :', message.message_id
message.ack()
self.queue.task_done()
def process(self, message):
"""Fake some work by sleeping for 0-15s. """
s = random.randint(0, 15)
print '>> Worker sleeping for ', s, message.message_id
for i in range(s):
time.sleep(1)
print i
class Subscriber(threading.Thread):
"""Handles the subscription to pubsub."""
def __init__(self):
threading.Thread.__init__(self, name='Subscriber')
self.subscriber = pubsub.SubscriberClient()
self.worker = Worker('FakeWorker')
self.worker.daemon = True
def run(self):
self.worker.start()
flow_control = pubsub.types.FlowControl(max_messages=10)
policy = self.subscriber.subscribe(SUBSCRIPTION_PATH,
flow_control=flow_control,
callback=self._consume)
print 'Sub started, thread', threading.current_thread()
def _consume(self, message):
self.worker.queue.put(message)
if __name__ == '__main__':
subscriber = Subscriber()
subscriber.start()
while 1:
pass
谢谢!
感谢卢克的回应。我相信我们一直在重复这些重复的信息。 Unf,尽最大努力确保重新传输的意义在于,我们必须保持集群范围内处理消息的缓存,这并不是很理想。现在我们切换到Google的prod基于HTTP的库;并提交了支持票。 – Greg