0
我一直在评估ZooKeeper作为一个简单的消息队列,我写了两个非常简单的脚本:mq feeder和mq consumer。馈线,下面,是精心推出20个职位的队列,然后监视队列状态(工种被消耗):ZooKeeper和基于Python的消息队列中的竞争条件
from kazoo.client import KazooClient
zk = KazooClient(hosts='xxx')
zk.start()
for i in xrange(20):
zk.create("/queue/%s" % i, b"%s" % i)
while 1:
print zk.get_children('/queue')
消费者,下面,正在启动几次(最多3个并发进程中我的测试),并取任务列表,在它迭代找开锁的工作,处理它(休眠随机数秒,以模拟一些工作),一旦完成,将删除该作业,然后删除该锁:
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
from time import sleep
import random
zk = KazooClient(hosts='xxx')
zk.start()
zk.ensure_path("/locks")
zk.ensure_path("/queue")
while 1:
jobs = sorted(zk.get_children('/queue'))
if jobs:
for i in jobs:
print "Checking job: %s" % i
try:
zk.create("/locks/%s" % i)
except NodeExistsError:
print "Job is locked, skipping!"
pass
else:
print "Job is unlocked, processing."
sleep(random.randrange(5))
zk.delete("/queue/%s" % i)
print "Deleted processed job, deleting the lock."
zk.delete("/locks/%s" % i)
pass
else:
print "There's no locks in the queue."
pass
我看到了,我是无法追踪的问题是,消费者的过程与退出:
Traceback (most recent call last):
File "zk_consumer.py", line 24, in <module>
zk.delete("/queue/%s" % i)
File "/Library/Python/2.7/site-packages/kazoo/client.py", line 1055, in delete
return self.delete_async(path, version).get()
File "/Library/Python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in get
raise self._exception
kazoo.exceptions.NoNodeError: ((), {})
而最后的进程仍然永远检查单的工作,留在队列中,但始终处于锁定状态。很明显,我在这里遇到了一些逻辑错误,我认为这会导致竞争状态,但我花了一些时间在上面,而我似乎无法发现它。我在这里做错了什么,或者ZooKeeper不是简单工作队列的可行解决方案?
你是正确的,但我也发现了有一个更正确的以实现我所需要的,也就是说,使用Kazoo的LockingQueue配方,如https://kazoo.readthedocs.org/en/latest/api/recipe/queue.html#kazoo.recipe.queue.LockingQueue – SpankMe 2013-05-04 20:38:27