2017-09-19 218 views
2

使用最新版本的apache airflow。从LocalExecutor开始,在该模式下,一切工作正常,除了Web UI状态需要使用CeleryExecutor的一些交互。使用Redis安装和配置Celery执行程序,将Redis配置为代理程序URL和结果后端。Apache Airflow芹菜Redis解码错误

它出现在第一个工作,直到任务计划此时它提供了以下错误:

File "/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler 
    job.run() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run 
    self._execute() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute 
    self._execute_helper(processor_manager) 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper 
    self.executor.heartbeat() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat 
    self.sync() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync 
    state = async.state 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state 
    return self._get_task_meta()['status'] 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta 
    return self._maybe_set_cache(self.backend.get_task_meta(self.id)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta 
    meta = self._get_task_meta_for(task_id) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for 
    return self.decode_result(meta) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result 
    return self.meta_from_decoded(self.decode(payload)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode 
    accept=self.accept) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors 
    reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors 
    yield 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

似乎是一个咸菜序列化错误,但我不知道如何追踪原因。有什么建议么?

此问题一直影响我使用subdag功能的工作流程,可能问题与此有关。

注:我也使用rabbitMQ进行测试,在那里有一个不同的问题;客户端显示“通过对等方重置连接”并崩溃。 RabbitMQ日志显示“客户端意外关闭TCP连接”。

回答

0

我偶然发现了这个在我们的调度日志中看到完全一样的回溯后:

File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

芹菜试图unpickle的东西,用“{”形迹可疑开始,所以我采取的tcpdump的事实流量并通过网络用户界面触发任务。所得的捕获包括本次交流几乎完全一样的瞬间,上述回溯出现在调度日志:

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2" 
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}" 

从Redis的响应显然是JSON,为什么是芹菜试图unpickle它的有效载荷?我们正在从Airflow 1.7迁移到1.8,在我们的推出期间,我们有一个Airflow工作人员正在运行v1.7,另一个正在运行v1.8。工作人员应该从排队的工作队中抽出来,但由于我们的DAG中有一个错误,我们有一个由Airflow 1.8安排的TaskInstance,然后由通过Airflow 1.7启动的芹菜工作人员执行。

AIRFLOW-1038将用于celery任务状态的序列化程序从JSON(缺省值)更改为pickle,因此在此更改之前运行代码版本的工作人员将以JSON序列化结果,以及运行包含此代码版本的调度程序更改会尝试通过取消打开来反序列化结果,这会导致上述错误。

0

请确认您在airflow.cfg中配置了哪种celery_result_backend。尝试将其切换到数据库后端(MySQL等),如果不是这样的话。

我们看到,使用ampq后端(仅适用于Celery 3.1及更低版本),redis和rpc后端有时会出现问题。