2016-06-07 52 views
2

我正在使用Celery调度一些长期任务的Django项目。 Django和Celery都运行在完全独立的进程中,并需要一种协调对数据库访问的方式。我想使用Python的multiprocessing.RLock类(或等价物),因为我需要该锁是可重入的。在多个独立进程中使用Python RLocks

我的问题是,我如何提供访问单独进程的RLock?

我发现的两个最佳解决方案(posix_ipc modulefcntl)仅限于基于Unix的系统,我们希望避免限制自己。

是否有跨平台的方式来共享流程之间的锁而不需要共同的祖先进程?

+1

这不是一个直接的答案,但除非你需要*硬*具有较强的顺序性锁定,你可能想看看在诸如[0MQ](http://zeromq.org/)的消息传递系统中。一个伟大的消息系统,几乎可以运行任何东西,并且可以绑定任何语言。 –

+0

对于* 0MQ *为+1,以便在用各种语言编写的进程之间进行通信并且延迟很大。我并不习惯Celery和它可能已经涉及(或限制)的东西,但也许你也可以考虑使用['redis'](http://redis.io/topics/distlock),已经有一些python绑定围绕这种功能(https://pypi.python.org/pypi/python-redis-lock,https://github.com/glasslion/redlock,https://github.com/SPSCommerce/redlock-py,等等) – mgc

+0

你确实意识到这个要求“没有共同的祖先过程”意味着你不能使用'multiprocessing',对吧? – Louis

回答

0

我结束了使用RabbitMQ作为创建分布式锁的方法。关于如何做到这一点的细节可以在RabbitMQ的博客上找到:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/

简而言之,您将为该锁创建一个RabbitMQ队列并向其发送一条消息。要获取锁定,请在队列上运行basic_get(非阻塞)或basic_consume(阻塞)。这会从队列中移除消息,从而阻止其他线程获取锁定。一旦你的工作完成,发送一个否定的确认将导致RabbitMQ重新发送消息,允许下一个线程继续。

不幸的是,这不允许重入锁。

上面引用的链接为Java代码提供了如何去做这件事。搞清楚如何将它翻译成Python/Pika令人讨厌,我认为我应该在这里发布一些示例代码。

为了生产锁:

import pika 

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection: 
    channel = connection.channel() 
    channel.queue_declare(queue="LockQueue") 
    channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock') 
    channel.close() 

获取锁:

import pika 
import time 

def callback(ch, method, properties, body): 
    print("Got lock") 

    for i in range(5, 0, -1): 
     print("Tick {}".format(i)) 
     time.sleep(1) 

    print("Releasing lock") 
    ch.basic_nack(delivery_tag=method.delivery_tag) 
    ch.close() # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock. 

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection: 
    channel = connection.channel() 

    channel.queue_declare(queue='LockQueue') 
    channel.basic_qos(prefetch_count=1) 
    channel.basic_consume(callback, queue='LockQueue') 

    print("Waiting for lock") 
    channel.start_consuming() 
    print("Task completed")