2014-11-21 74 views
8

我遇到问题时发现了一些似乎对我来说会比较简单的事情。使用Celery初始化一个带参数的工作人员

我使用芹菜3.1与Python 3,我想用参数初始化我的工人,以便他们可以使用这些细节进行设置。

具体而言:这些工作人员将使用需要使用认证凭证与第三方API进行交互的任务。在消费任何任务之前,工作人员必须将认证详细信息传递给API服务器(认证详细信息在第一次认证请求后存储在cookie中)。

我想通过CLI启动这些登录凭证给工作人员。然后,我希望工作人员使用它们进行身份验证并存储会话以供将来的任务使用(理想情况下,这将存储在可以从任务访问的属性中)。

芹菜可能吗?

作为一个便笺,我已经考虑将requests.session对象(来自Python requests库)作为任务参数传递,但这需要序列化,看起来像是皱眉。

回答

16

我建议使用抽象任务基类并缓存requests.session

从芹菜文档:

任务不是实例为每个请求,但被注册在任务注册表作为一个全球性的实例。

这意味着__init__构造函数只会在每个进程中调用一次,并且该任务类在语义上更接近Actor。

这对缓存资源也很有用......

@app.task(base=APITask, bind=True) 
def call_api(self, url): 
    # self will refer to the task instance (because we're using bind=True) 
    self.session.get(url) 

您也可以使用app.task装饰作为将在设置一个额外的参数通过API认证选项:当您创建,这将使API请求的任务

import requests 
from celery import Task 

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    def __init__(self): 
     # since this class is instantiated once, use this method 
     # to initialize and cache resources like a requests.session 
     # or use a property like the example below which will create 
     # a requests.session only the first time it's accessed 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      session.auth = ('user', 'pass') 

      self._session = session 

     return self._session 

现在任务__dict__,例如:

# pass a custom auth argument 
@app.task(base=APITask, bind=True, auth=('user', 'pass')) 
def call_api(self, url): 
    pass 

,使基类使用通过了uthentication选项:

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    # the API authentication 
    auth =() 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      # use the authentication that was passed to the task 
      session.auth = self.auth 

      self._session = session 

     return self._session 

你可以阅读更多的芹菜文档网站:

现在回到你原来的问题这是传递额外的参数给来自命令行的工作人员:

有关于这在芹菜文档Adding new command-line options一个部分,这里是传递一个用户名和密码在命令行工作人员的例子:

$ celery worker -A appname --username user --password pass 

代码:

from celery import bootsteps 
from celery.bin import Option 


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.') 
) 

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.') 
) 


class CustomArgs(bootsteps.Step): 

    def __init__(self, worker, api_username, api_password, **options): 
     # store the api authentication 
     APITask.auth = (api_username, api_password) 


app.steps['worker'].add(CustomArgs) 
+0

非常好,我很难破译文档中的所有内容。谢谢你把它铺好。 – 2014-11-24 22:39:51

+0

对不起,再次挖掘一次,你能否澄清如何将命令行参数从Boostep传递给任务初始化(以便我可以使用命令提供的用户名和密码初始化任务会话对象-线)。目标是不以纯文本存储我的API凭据。 – 2014-11-28 06:08:32

+0

@JoshuaGilman对于延迟抱歉,我用一个例子更新了答案。 – Pierre 2014-12-01 12:20:02

0

我想你可以调用你使用命令行参数编写的脚本。像下面这样:

my_script.py username password 

里面你的脚本,你可以有你的主要功能封装在一个@celery.task@app.task装饰。

import sys 

from celery import Celery 

cel = Celery() # put whatever config info you need in here 

@celery.task 
def main(): 
    username, password = sys.argv[1], sys.argv[2] 

这样的事情应该让你开始。请务必查看Python的argparse以获得更复杂的参数解析。

+0

谢谢,但您无法通过调用python脚本来启动工作进程。你必须像这样调用芹菜:'celery -A proj worker -l info' – 2014-11-22 00:18:38

+0

我们在这里必须有一个非常奇怪的设置,然后......因为这看起来就像它的工作。我必须研究我们正在做的更多。 – 2014-11-24 14:22:56