2017-07-04 112 views
10

我正在研究一个Django应用程序。我有一个API端点,如果请求,必须执行必须重复几次(直到某个条件成立)的函数。我如何处理它现在的问题是 -Django - 每x秒运行一个函数

def shut_down(request): 
    # Do some stuff 
    while True: 
    result = some_fn() 
    if result: 
     break 
    time.sleep(2) 

    return True 

虽然我知道这是一个可怕的办法,我不应该阻止2秒,我不能想出如何绕过它。
这个工程,说了4秒钟后。但是我想要让循环在后台运行,并且在some_fn返回True时停止。 (也可以肯定some_fn会返回True)

编辑 -
阅读Oz123的回应给了我一个似乎工作的想法。以下是我所做的 -

def shut_down(params): 
    # Do some stuff 
    # Offload the blocking job to a new thread 

    t = threading.Thread(target=some_fn, args=(id,), kwargs={}) 
    t.setDaemon(True) 
    t.start() 

    return True 

def some_fn(id): 
    while True: 
     # Do the job, get result in res 
     # If the job is done, return. Or sleep the thread for 2 seconds before trying again. 

     if res: 
      return 
     else: 
      time.sleep(2) 

这对我来说没有工作。这很简单,但我不知道多线程与Django结合的效率。
如果有人可以指出这个缺陷,批评是赞赏。

+1

您可以使用芹菜为此。你可以在这里找到手册:https://realpython.com/blog/python/asynchronous-tasks-with-django-and-celery/#periodic-tasks – neverwalkaloner

+0

@neverwalkaloner听起来像我所需要的。将尝试使用它,谢谢。 :) – Zeokav

+1

正如@neverwalkaloner提到的那样,您可以使用芹菜,您可以按计划设置定期任务,查看文档非常灵活http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html看看它的crontab模块。 –

回答

5

对于许多小型项目,芹菜是一种超必杀。对于那些可以使用schedule的项目,它非常易于使用。

有了这个库,你可以做任何功能执行定期任务:

import schedule 
import time 

def job(): 
    print("I'm working...") 

schedule.every(10).minutes.do(job) 
schedule.every().hour.do(job) 
schedule.every().day.at("10:30").do(job) 
schedule.every().monday.do(job) 
schedule.every().wednesday.at("13:15").do(job) 

while True: 
    schedule.run_pending() 
    time.sleep(1) 

的例子在阻塞的方式运行,但如果在常见问题看,你会发现,你还可以在运行任务并行线程,这样你不会阻止,并删除一旦不需要了任务:从日程安排进口计划

def run_continuously(self, interval=1): 
    """Continuously run, while executing pending jobs at each elapsed 
    time interval. 
    @return cease_continuous_run: threading.Event which can be set to 
    cease continuous run. 
    Please note that it is *intended behavior that run_continuously() 
    does not run missed jobs*. For example, if you've registered a job 
    that should run every minute and you set a continuous run interval 
    of one hour then your job won't be run 60 times at each interval but 
    only once. 
    """ 

    cease_continuous_run = threading.Event() 

    class ScheduleThread(threading.Thread): 

     @classmethod 
     def run(cls): 
      while not cease_continuous_run.is_set(): 
       self.run_pending() 
       time.sleep(interval) 

    continuous_thread = ScheduleThread() 
    continuous_thread.setDaemon(True) 
    continuous_thread.start() 
    return cease_continuous_run 


Scheduler.run_continuously = run_continuously 

这里 是一类方法的用法的例子:

def foo(self): 
     ... 
     if some_condition(): 
      return schedule.CancelJob # a job can dequeue it 

    # can be put in __enter__ or __init__ 
    self._job_stop = self.scheduler.run_continuously() 

    logger.debug("doing foo"...) 
    self.foo() # call foo 
    self.scheduler.every(5).seconds.do(
     self.foo) # schedule foo for running every 5 seconds 

    ... 
    # later on foo is not needed any more: 
    self._job_stop.set() 

    ... 

    def __exit__(self, exec_type, exc_value, traceback): 
     # if the jobs are not stop, you can stop them 
     self._job_stop.set() 
+0

事实上,我在寻找避免芹菜(现在)的方法,因为我只需要在我的项目的一个或两个地方安排时间。我刚刚遇到了这样的哈哈!如果我有任何问题,我会尝试实施并回复。感谢您的答复! :) – Zeokav

+0

我在下面发布了一个答案。如果你能告诉我这是否可能会导致问题,我会很感激。 – Zeokav

+0

有趣的约定 - 你为什么不创建一个继承自'Scheduler'的类继承'run_continuously'方法的原因?这会让事情变得更加直接明了。 – Tim

1

我建议你使用芹菜的task管理。你可以参考this来设置这个应用程序(包,如果你是从javaScript背景)。

一旦设定,就可以改变代码:

@app.task 
def check_shut_down(): 
    if not some_fun(): 
     # add task that'll run again after 2 secs 
     check_shut_down.delay((), countdown=3) 
    else: 
     # task completed; do something to notify yourself 
     return True