2012-08-04 107 views
11

我一直在使用Flask为我的k8055 USB接口板提供简单的Web API;相当标准的得分手和推杆,而Flask真的让我的生活变得更轻松。如何在Python中使用Flask执行定期任务

但我希望能够注册乳清发生时状态的变化。例如,如果我有一个连接到电路板的按钮,我可以轮询该特定端口的api。但是如果我想让输出直接反映输出结果,不管有人是否在与api交谈,我会有这样的事情。

while True: 
    board.read() 
    board.digital_outputs = board.digital_inputs 
    board.read() 
    time.sleep(1) 

而且每隔一秒,输出将被更新以匹配输入。

Flask下有什么办法做这种事吗?我之前在Twisted中做过类似的事情,但Flask对于这个特殊的应用来说太放手了,现在就放弃它了...

谢谢。

回答

11

您可以使用cron执行简单的任务。

为您的任务创建烧瓶视图。

# a separate view for periodic task 
@app.route('/task') 
def task(): 
    board.read() 
    board.digital_outputs = board.digital_inputs 

然后使用cron的,下载来自该网址定期

# cron task to run each minute 
0-59 * * * * run_task.sh 

凡run_task.sh内容

wget http://localhost/task 

cron是无法运行更频繁一次一分钟。如果你需要更高的频率,(比如说,分别5秒期间=每分钟12次),你都必须做在tun_task.sh通过以下方式

# loop 12 times with a delay 
for i in 1 2 3 4 5 6 7 8 9 10 11 12 
do 
    # download url in background for not to affect delay interval much 
    wget -b http://localhost/task 
    sleep 5s 
done 
-1

不,在Flask中没有任何支持,但是您可以使用flask-celery或只需在单独的线程(greenlet)中运行您的函数。

+3

感谢您的建议。 我去了gevent/greenlet路线,但它似乎'主'线程不屈服于循环线程(使用gevent.sleep而不是上述时间) 至于芹菜,肯定实施消息队列服务器是过度杀毒为什么这么'简单'(TM)? – Bolster 2012-08-05 11:54:48

1

对于我的瓶的应用程序,我考虑使用所描述的方法的cron由Pashka在他的answerschedule库和APScheduler

我发现APScheduler很简单,并且服务于定期任务运行目的,所以继续使用APScheduler。

示例代码:

from flask import Flask, jsonify 

from apscheduler.schedulers.background import BackgroundScheduler 


app = Flask(__name__) 

def test_job(): 
    print('I am working...') 

scheduler = BackgroundScheduler() 
job = scheduler.add_job(test_job, 'interval', minutes=1) 
scheduler.start() 
相关问题