2015-11-02 58 views
1

我有这样如何防止任务标记为成功的回报[芹菜]

@celery.task(bind=True, max_retries = 5) 
def send_notification_task(self, user_id, content): 
    print 'Sending message to pubnub' 

    msg = prepare_message(16182 , 16182, 'Android', 'This is a text message') 

    def _callback(message): 
     print 'success callback' 
     print(message) 

    def _error(message): 
     print 'error callback' 
     print self.request.retries 
     print self 
     countdown = int(random.uniform(2, 4) ** self.request.retries) 
     print countdown 
     raise self.retry(countdown=countdown) 

    publish('16182', msg, _callback, _error) 

    print 'returning success to celery' 

    current_task.update_state(state='PROGRESS', meta={'description': 'Doing some task', 'current': 59, 'tota': 73}) 

任务在这里,我想只有在回调方法,即成功或失败的任务标记为已成功/失败。但是,只要此函数返回,任务就被标记为成功(发布本身就是异步调用)。

我该如何处理?

我已经尝试将任务标记为PROGRESS作为最后一行。

回答

0

在这里回答我自己的问题。我结束了使用自定义CountDownLatch。以下是更新后的代码:

def send_notification_task(self, user, content): 
    a = datetime.now() 

    latch = CountDownLatch(1) 
    print 'Sending message to pubnub' 

    status = {'result' : 0} 

    print "Here is the content:" 
    print content 

    if 'image_url' in content: 
     image_url = content['image_url'] 
    else: 
     image_url = None 

    if 'category' in content: 
     category = content['category'] 
    else: 
     category = 'pnr' 

    msg = prepare_message(user, content['title'], content['message'], image_url, category) 

    if msg==None: 
     print 'Not sending None message (probably iOS device)' 
     return 

    def _callback(message): 
     print 'success callback' 
     print(message) 
     status['result'] = 1 
     latch.count_down() 


    def _error(message): 
     print 'error callback' 
     countdown = exponential_backoff(self.request.retries) 
     status['result'] = 2 
     print str(status) + ' error' 
     latch.count_down() 

    print msg 
    print user['user_channel_id']  

    publish(user['user_channel_id'], msg, _callback, _error) 

    latch.await() 

    # while status['result'] == 0: 
    #  print 'loop' 
    #  print status 
    #  time.sleep(.001) 

    if status['result'] == 2: 
     print 'retry' 
     countdown = exponential_backoff(self.request.retries) 
     raise self.retry(countdown = countdown) 
    elif status['result'] == 1: 
     print 'success' 
     b = datetime.now() 

     c = b - a 

     print 'send_notification_task {0}'.format(c) 
     return 'success' 

而且countdownlatch,

import threading 

class CountDownLatch(object): 
    def __init__(self, count=1): 
     self.count = count 
     self.lock = threading.Condition() 

    def count_down(self): 
     self.lock.acquire() 
     self.count -= 1 
     if self.count <= 0: 
      self.lock.notifyAll() 
     self.lock.release() 

    def await(self): 
     self.lock.acquire() 
     while self.count > 0: 
      self.lock.wait() 
     self.lock.release()