2017-07-07 57 views
7

我正在使用celery在Hadoop上运行长时间运行的任务。每个任务在Hadoop上执行一个Pig脚本,该脚本运行约30分钟 - 2个小时。使用不同的值初始化不同的芹菜工人

我目前的Hadoop设置有4个队列a,b,c和默认值。所有任务当前正由单个工作人员执行,该工作人员将工作提交给单个队列。

我想添加另外3个将作业提交到其他队列的工作人员,每个队列一个工人。

问题是队列目前是硬编码的,我希望为每个工人做这个变量。

我搜索了很多,但我无法找到一种方法来传递每个芹菜工作者不同的队列值,并在我的任务中访问它。

我开始像我这样的芹菜工人。

celery -A app.celery worker 

我希望通过一些额外的参数在命令行本身和访问它在我的任务,但芹菜抱怨说,它不理解我的自定义参数。

我打算通过设置--concurrency=3参数来运行同一主机上的所有工人。有没有解决这个问题的方法?

谢谢!

编辑

目前的情况是这样的。每次我试图说tasks.print_something.delay()执行任务print_something只打印队列C.

@celery.task() 
def print_something(): 
    print "C" 

我需要有工人打印基于我通过什么样的价值给他们,而他们开始一个变量信。

@celery.task() 
def print_something(): 
    print "<Variable Value Per Worker Here>" 
+0

是否有可能分享您已实施的代码段? –

回答

3

希望这可以帮助别人。

需要解决这个问题的多个问题。

第一步是在芹菜中添加对自定义参数的支持。如果没有完成,芹菜会抱怨它不了解参数。

因为我用烧瓶运行芹菜,所以我初始化芹菜。

def configure_celery(): 
    app.config.update(
     CELERY_BROKER_URL='amqp://:@localhost:5672', 
     RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'    
    ) 
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'], 
        broker=app.config['CELERY_BROKER_URL']) 
    celery.conf.update(app.config) 
    TaskBase = celery.Task 

    class ContextTask(TaskBase): 
     abstract = True 

     def __call__(self, *args, **kwargs): 
      with app.app_context(): 
       return TaskBase.__call__(self, *args, **kwargs) 

    celery.Task = ContextTask 
    return celery 

我调用这个函数来初始化芹菜并将其存储在一个变量,名为芹菜。

celery = configure_celery() 

要添加自定义参数,您需要执行以下操作。

def add_hadoop_queue_argument_to_worker(parser): 
    parser.add_argument(
     '--hadoop-queue', help='Hadoop queue to be used by the worker' 
    ) 

下面使用的芹菜是我们从上面的步骤中获得的。

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker) 

下一步就是让该参数在worker中可访问。要做到这一点,请遵循以下步骤

class HadoopCustomWorkerStep(bootsteps.StartStopStep): 

    def __init__(self, worker, **kwargs): 
     worker.app.hadoop_queue = kwargs['hadoop_queue'] 

通知芹菜使用这个类创建工人。

celery.steps['worker'].add(HadoopCustomWorkerStep) 

该任务现在应该能够访问变量。

@app.task(bind=True) 
def print_hadoop_queue_from_config(self): 
    print self.app.hadoop_queue 

通过在命令行上运行worker来验证它。

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n [email protected]%h 
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n [email protected]%h 
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n [email protected]%h 
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n [email protected]%h 
3

我最常做的是,在启动后,工作人员在另一个脚本(任务不执行)(说manage.py)我想补充与参数命令来启动特定的任务或任务使用不同的参数。

在manager.py

from tasks import some_task 

@click.command 
def run_task(params): 
    some_task.apply_async(params) 

,并根据需要,这将启动任务。