2017-02-27 109 views
0

蟒蛇调用ansibleApi与芹菜返回无,我搜索了几天。它与调用部署函数没有芹菜,但与芹菜我的代码调用ansibleApi返回无。django +芹菜+ ansibleApi返回无

重现步骤。

1.tasks.py

from celery import shared_task 
from .deploy_tomcat2 import django_process 


@shared_task 
def deploy(jira_num): 
    #return 'hello world {0}'.format(jira_num) 
    #rdb.set_trace() 
    return django_process(jira_num)  

2.deploy_tomcat2.py

from .AnsibleApi import CallApi 

def django_process(jira_num): 
    server = '10.10.10.30' 
    name = 'abc' 
    port = 11011 
    code = 'efs' 
    jdk = '1.12.13' 
    jvm = 'xxxx' 

    if str.isdigit(jira_num): 
     # import pdb 
     # pdb.set_trace() 
     call = CallApi(server,name,port,code,jdk,jvm) 
     return call.run_task() 

3.AnsibleApi.py

#!/usr/bin/env python 

import logging 
from .Logger import Logger 
from django.conf import settings 
from collections import namedtuple 
from ansible.parsing.dataloader import DataLoader 
from ansible.vars import VariableManager 
from ansible.inventory import Inventory 
from ansible.playbook.play import Play 
from ansible.executor.task_queue_manager import TaskQueueManager 
from ansible.plugins.callback import CallbackBase 

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO) 


class ResultCallback(CallbackBase): 
    def __init__(self, *args, **kwargs): 
     super(ResultCallback ,self).__init__(*args, **kwargs) 
     self.host_ok = {} 
     self.host_unreachable = {} 
     self.host_failed = {} 

    def v2_runner_on_unreachable(self, result): 
     self.host_unreachable[result._host.get_name()] = result 

    def v2_runner_on_ok(self, result, *args, **kwargs): 
     self.host_ok[result._host.get_name()] = result 

    def v2_runner_on_failed(self, result, *args, **kwargs): 
     self.host_failed[result._host.get_name()] = result 


class CallApi(object): 
    user = settings.SSH_USER 
    ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE 
    results_callback = ResultCallback() 
    Options = namedtuple('Options', 
         ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method', 
          'become_user', 'check']) 

    def __init__(self,ip,name,port,code,jdk,jvm): 
     self.ip = ip 
     self.name = name 
     self.port = port 
     self.code = code 
     self.jdk = jdk 
     self.jvm = jvm 
     self.results_callback = ResultCallback() 
     self.results_raw = {} 

    def _gen_user_task(self): 
     tasks = [] 
     deploy_script = 'autodeploy/tomcat_deploy.sh' 
     dst_script = '/tmp/tomcat_deploy.sh' 
     cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755') 
     args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm) 
     tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out')) 
     tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}')))) 
     # tasks.append(dict(action=dict(module='command', args=args))) 
     # tasks.append(dict(action=dict(module='command', args=args), register='result')) 
     # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}')))) 
     self.tasks = tasks 

    def _set_option(self): 
     self._gen_user_task() 

     self.variable_manager = VariableManager() 
     self.loader = DataLoader() 
     self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None, 
            become=True, become_method='sudo', become_user='root', check=False) 
     self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip]) 
     self.variable_manager.set_inventory(self.inventory) 

     play_source = dict(
     name = "auto deploy tomcat", 
      hosts = self.ip, 
      remote_user = self.user, 
      gather_facts='no', 
      tasks = self.tasks 
     ) 
     self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) 

    def run_task(self): 
     self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}} 
     tqm = None 
     from celery.contrib import rdb;rdb.set_trace() 
     #import pdb;pdb.set_trace() 
     self._set_option() 
     try: 
      tqm = TaskQueueManager(
       inventory=self.inventory, 
       variable_manager=self.variable_manager, 
       loader=self.loader, 
       options=self.options, 
       passwords=None, 
       stdout_callback=self.results_callback, 
      ) 
      result = tqm.run(self.play) 
     finally: 
      if tqm is not None: 
       tqm.cleanup() 

     for host, result in self.results_callback.host_ok.items(): 
      self.results_raw['success'][host] = result._result 

     for host, result in self.results_callback.host_failed.items(): 
      self.results_raw['failed'][host] = result._result 

     for host, result in self.results_callback.host_unreachable.items(): 
      self.results_raw['unreachable'][host]= result._result 
     Log.info("result is :%s" % self.results_raw) 
     return self.results_raw 

4.celery工人

celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info 

5.produce msg:

deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy') 

回答

0

看起来没问题。
唯一的问题是None是否真的deploy任务返回?
如果你可以发布你的芹菜工人日志会更好。

+0

'[2017年2月27日16:46:08554:INFO/MainProcess]所获任务:autodeploy.tasks.deploy [a963d1f3-cc1b-48da-9701-28297f7b68a5]'' [2017年2月27日16:46:08,786:INFO/PoolWorker-2]结果为:{'success':{},'failed':{},'unreachable':{}}' '[2017-02-27 16:46: 08,808:INFO/PoolWorker-2]任务autodeploy.tasks.deploy [a963d1f3-cc1b-48da-9701-28297f7b68a5]成功0.18285173299955204s:{'success':{},'failed':{},'unreachable':{}}' –

+0

看来你找到了问题'任务不允许启动子进程',不过不好建议取消这个限制,你可以参考一下[这个讨论](https://github.com/celery/celery/issues/1709)好像要自己控制(1,换成线程; 2.daemon = FALSE;其他没仔细看:)).'TaskQueueManager'没用过,没法帮你... – Cheney

+0

这个技能还没有get,晚点再研究一下。 –

0

有两种方法来解决这个问题,禁用断言: 1.where芹菜开始设置出口PYTHONOPTIMIZE = 1或与此参数-O优化 2.停用蟒分组多process.py线102开始芹菜:

assert not _current_process._config.get('daemon'), \ 
       'daemonic processes are not allowed to have children'