2

使用最新版本的 apache 气流。从 LocalExecutor 开始,在该模式下一切正常,除了一些交互,Web UI 状态需要 CeleryExecutor 才能使用它们。使用 Redis 安装和配置 Celery 执行器,将 Redis 配置为代理 URL 和结果后端。

一开始它似乎可以工作,直到安排了一个任务,此时它给出了以下错误:

 File "/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler
    job.run()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run
    self._execute()
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute
    self._execute_helper(processor_manager)
  File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper
    self.executor.heartbeat()
  File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat
    self.sync()
  File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync
    state = async.state
  File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state
    return self._get_task_meta()['status']
  File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta
    meta = self._get_task_meta_for(task_id)
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for
    return self.decode_result(meta)
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result
    return self.meta_from_decoded(self.decode(payload))
  File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode
    accept=self.accept)
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads
    return load(BytesIO(s))
kombu.exceptions.DecodeError: invalid load key, '{'.

似乎是泡菜序列化错误,但我不确定如何追踪原因。有什么建议么?

这个问题一直影响我使用 subdag 功能的工作流程,也许问题与此有关。

注意:我也使用 rabbitMQ 进行了测试,那里有不同的问题;客户端显示“对等方重置连接”并崩溃。RabbitMQ 日志显示“客户端意外关闭 TCP 连接”。

4

3 回答 3

0

在我们的调度程序日志中看到完全相同的回溯后,我偶然发现了这一点:

  File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads
    return load(BytesIO(s))
kombu.exceptions.DecodeError: invalid load key, '{'.

celery 试图解开以 '{' 开头的东西的事实似乎很可疑,所以我获取了流量的 tcpdump 并通过 Web UI 触发了一个任务。结果捕获包括这个交换几乎与上述回溯出现在调度程序日志中的同一时刻:

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2"
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}"

Redis 响应的有效载荷显然是 JSON,那么为什么 celery 试图解开它呢?我们正在从 Airflow 1.7 迁移到 1.8,在我们推出期间,我们有一组运行 v1.7 的 Airflow 工作人员,另一组运行 v1.8。工作人员应该从具有不相交工作负载的队列中拉出,但由于我们的一个 DAG 中的错误,我们有一个由 Airflow 1.8 调度的 TaskInstance,然后由通过 Airflow 1.7 启动的 celery 工作人员执行。

AIRFLOW-1038将 celery 任务状态的序列化程序从 JSON(默认)更改为 pickle,因此在此更改之前运行代码版本的工作人员将在 JSON 中序列化结果,并且运行包含此更改的代码版本的调度程序将尝试通过 unpickling 反序列化结果,这会导致上述错误。

于 2017-10-09T17:26:21.450 回答
0

我们遇到了这个问题,我们的配置管理没有正确更新,所以调度程序和一些工作人员在 apache-airflow 1.8.2 上,而大量工作人员正在运行气流 1.8.0。检查所有节点是否运行相同版本的气流。

于 2019-08-23T00:04:21.697 回答
0

请验证您在airflow.cfg 中配置了哪种celery_result_backend。如果不是这种情况,请尝试将其切换到数据库后端(mysql 等)。

我们看到使用 ampq 后端(仅在 Celery 3.1 及更低版本上可用),redis 和 rpc 后端有时会出现问题。

于 2017-11-06T15:37:26.330 回答