0

我一直在评估ZooKeeper作为一个简单的消息队列,我写了两个非常简单的脚本:mq feeder和mq consumer。馈线,下面,是精心推出20个职位的队列,然后监视队列状态(工种被消耗):ZooKeeper和基于Python的消息队列中的竞争条件

from kazoo.client import KazooClient 

zk = KazooClient(hosts='xxx') 
zk.start() 

for i in xrange(20): 
    zk.create("/queue/%s" % i, b"%s" % i) 

while 1: 
    print zk.get_children('/queue') 

消费者,下面,正在启动几次(最多3个并发进程中我的测试),并取任务列表,在它迭代找开锁的工作,处理它(休眠随机数秒,以模拟一些工作),一旦完成,将删除该作业,然后删除该锁:

from kazoo.client import KazooClient 
from kazoo.exceptions import NodeExistsError 
from time import sleep 
import random 

zk = KazooClient(hosts='xxx') 
zk.start() 
zk.ensure_path("/locks") 
zk.ensure_path("/queue") 

while 1: 
    jobs = sorted(zk.get_children('/queue')) 
    if jobs: 
    for i in jobs: 
     print "Checking job: %s" % i 
     try: 
     zk.create("/locks/%s" % i) 
     except NodeExistsError: 
     print "Job is locked, skipping!" 
     pass 
     else: 
     print "Job is unlocked, processing." 
     sleep(random.randrange(5)) 
     zk.delete("/queue/%s" % i) 
     print "Deleted processed job, deleting the lock." 
     zk.delete("/locks/%s" % i) 
     pass 
    else: 
    print "There's no locks in the queue." 
    pass 

我看到了,我是无法追踪的问题是,消费者的过程与退出:

Traceback (most recent call last): 
    File "zk_consumer.py", line 24, in <module> 
    zk.delete("/queue/%s" % i) 
    File "/Library/Python/2.7/site-packages/kazoo/client.py", line 1055, in delete 
    return self.delete_async(path, version).get() 
    File "/Library/Python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in get 
    raise self._exception 
kazoo.exceptions.NoNodeError: ((), {}) 

而最后的进程仍然永远检查单的工作,留在队列中,但始终处于锁定状态。很明显,我在这里遇到了一些逻辑错误,我认为这会导致竞争状态,但我花了一些时间在上面,而我似乎无法发现它。我在这里做错了什么,或者ZooKeeper不是简单工作队列的可行解决方案?

回答

1

您的代码很活泼。考虑这个序列,

T1      T2 
read queue/1  
         read queue/1 
         write lock/1 
         delete queue/1 
         delete lock/1 
write lock/1 
delete queue/1 (FAIL, no node!) 

你锁定后,您需要再次阅读,以确保没有其他人删除队列1.

+0

你是正确的,但我也发现了有一个更正确的以实现我所需要的,也就是说,使用Kazoo的LockingQueue配方,如https://kazoo.readthedocs.org/en/latest/api/recipe/queue.html#kazoo.recipe.queue.LockingQueue – SpankMe 2013-05-04 20:38:27