2017-09-05 76 views
0

我在我的Django项目中使用了一个Celery任务,并带有锁,如this article中所述。它工作的很好,但我的任务创建一个对象,我不希望在数据库中提交对象之前释放锁。我如何才能更改此上下文管理器以等待任务中的对象提交?发布交易提交时的芹菜锁

@contextmanager 
def lock(lock_id, oid, expire=600): 
    timeout_at = monotonic() + expire - 3 
    status = cache.add(lock_id, oid, expire) 
    try: 
     yield status 
    finally: 
     if monotonic() < timeout_at: 
      cache.delete(lock_id) 

@celery.task(bind=True, ignore_result=True) 
def my_task(self, object_id): 
    with lock('my_task.{}'.format(object_id), self.app.oid) as acquired, transaction.atomic(): 
     if not acquired: 
      self.retry(countdown=1) 
     def on_commit(): 
      # release the lock only in this moment 
      pass 
     transaction.on_commit(on_commit) 
     MyModel.objects.create(object_id=object_id) 

回答

1

此上下文管理器创建一个锁并在事务中封装正文。只有在提交事务或发生异常(除celery.exceptions.Retry)之外,它才会释放锁。

如芹菜文档指出:

为了使其正常工作,你需要使用一个缓存后端其中。新增操作是原子。已知memcached可以很好地用于此目的。

from celery.exceptions import Retry 
from contextlib import contextmanager 
from time import monotonic 
from django.core.cache import cache 
from django.db import transaction 


@contextmanager 
def lock_transaction(lock_id, oid, expire=600): 
    status = cache.add(lock_id, oid, expire) 
    timeout_at = monotonic() + expire - 3 
    is_retry = False 

    def on_commit(): 
     if not is_retry and monotonic() < timeout_at: 
      cache.delete(lock_id) 

    with transaction.atomic(): 
     transaction.on_commit(on_commit) 
     try: 
      yield status 
     except Retry as e: 
      is_retry = True 
     except: 
      if monotonic() < timeout_at: 
       cache.delete(lock_id) 
      raise 

使用的一个示例:

@celery.task(bind=True, ignore_result=True, max_retries=90, time_limit=60) 
def create_or_add_counter_task(self, object_id): 
    with lock_transaction('object_id.{}'.format(object_id), self.app.oid) as acquired: 
     if not acquired: 
      self.retry(countdown=1) 
     try: 
      obj = MyModel.objects.get(object_id=object_id) 
      obj.counter += 1 
      obj.save() 
     except MyModel.DoesNotExist: 
      MyModel.objects.create(object_id=object_id)