2016-06-09 99 views
2

我想这是在运行时注册任务的最佳方式。我workfflow如下:芹菜任务的动态注册

  • 开始芹菜应用
  • 开始Python应用程序
  • 的Python应用程序创建一个新的任务,我想在芹菜

回答

2

我已经完成的方式来安排是基于与“click package has with custom subcommands”具有相同想法的“插件”概念。

该应用结构(基于蟒3):

. 
├── dynamic_tasks.py 
├── run.py 
└── tasks 
    └── get_rate.py 

芹菜任务dynamic_tasks.py被定义为以下:

import os 
import celery 

app = celery.Celery('dynamic_tasks', broker='amqp://[email protected]/', backend='rpc://') 

PLUGIN_FOLDER = os.path.join(os.path.dirname(__file__), 'tasks') 
def _absolutepath(filename): 
    """ Return the absolute path to the filename""" 
    return os.path.join(PLUGIN_FOLDER, filename) 

@app.task 
def tasks(funcname, *args, **kwargs): 
    try: 
     funcname = funcname.replace('-', '_') 
     funcname += '.py' 
     func = _absolutepath(funcname) 
     ns = {} 
     with open(func) as f: 
      code = compile(f.read(), func, 'exec') 
      eval(code, ns, ns) 
     return ns['task'](*args, **kwargs) 
    except IOError as e: 
     # Manage IOError 
     raise e 

的可插拔任务示例任务/get_rate.py

""" This task get the currency rate between a pair of currencies """  
import urllib.request 

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p' 

def task(pair='EURSEK', url_tmplt=URL): 
    with urllib.request.urlopen(url_tmplt.format(pair)) as res: 
     body = res.read() 
    return (pair, float(body.strip())) 

而且,简单地说,从run.py运行示例:

from dynamic_tasks import tasks 

print(tasks.delay('get_rate', 'EURSEK').get()) 

EDITED 既然型动物的机器上运行芹菜,不可能依靠本地文件系统。我的新方法是发送函数作为字符串执行:

@app.task 
def dynamic_tasks(funcname, funccode, *args, **kwargs): 
    try: 
     ns = {} 
     code = compile(funccode, funcname, 'exec') 
     eval(code, ns, ns) 
     logger.info('execute %r with args %r, %r', funcname, args, kwargs) 
     return ns['task'](*args, **kwargs) 
    except IOError: 
     logger.error("Error loading the dynamic function from text %s", funcname)