5

Airflow封装的 DAG似乎是合理的生产气流部署的重要组成部分。

我有一个由配置文件驱动的带有动态 subDAG 的 DAG,例如:

配置.yaml:

imports:
  - project_foo
  - project_bar`

这会产生 subdag 任务,例如imports.project_{foo|bar}.step{1|2|3}.

我通常使用 python 的open函数读取配置文件,a laconfig = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')

不幸的是,在使用打包的 DAG 时,这会导致错误:

Broken DAG: [/home/airflow/dags/workflows.zip] [Errno 20] Not a directory: '/home/airflow/dags/workflows.zip/config.yaml'

有什么想法/最佳实践可以在这里推荐吗?

4

1 回答 1

3

这有点杂乱无章,但我最终还是回到了通过ZipFile.

import yaml
from zipfile import ZipFile
import logging
import re

def get_config(yaml_filename):
  """Parses and returns the given YAML config file.

  For packaged DAGs, gracefully handles unzipping.
  """
  zip, post_zip = re.search(r'(.*\.zip)?(.*)', yaml_filename).groups()
  if zip:
    contents = ZipFile(zip).read(post_zip.lstrip('/'))
  else:
    contents = open(post_zip).read()
  result = yaml.safe_load(contents)
  logging.info('Parsed config: %s', result)
  return result

正如您对 main 所期望的那样工作dag.py

get_config(os.path.join(path.split(__file__)[0], 'config.yaml'))
于 2018-08-13T23:20:56.933 回答