6

我正在使用气流稳定的掌舵图并使用 Kubernetes Executor,正在为 dag 安排新的 pod,但找不到 dag_id 失败的问题。我正在使用 git-sync 来获取 dags。以下是错误和 kubernetes 配置值。有人可以帮我解决这个问题吗?

错误:

[2020-07-01 23:18:36,939] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-07-01 23:18:36,940] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/dags/etl/sampledag_dag.py
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 37, in <module>
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 75, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py", line 523, in run
    dag = get_dag(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py", line 149, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: sampledag  . Either the dag did not exist or it failed to parse.

配置:

      AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: false
      AIRFLOW__KUBERNETES__GIT_REPO: git@git.com/dags.git
      AIRFLOW__KUBERNETES__GIT_BRANCH: master
      AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT: /dags
      AIRFLOW__KUBERNETES__GIT_SSH_KEY_SECRET_NAME: git-secret
      AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: airflow-repo
      AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: tag
      AIRFLOW__KUBERNETES__RUN_AS_USER: "50000"

采样标签

import logging
import datetime

from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
import os

args = {
    'owner': 'airflow'
}

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


try:
    print("Entered try block")
    with models.DAG(
            dag_id='sampledag',
            schedule_interval=datetime.timedelta(days=1),
            start_date=YESTERDAY) as dag:
     
        print("Initialized dag")
               kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
                # The ID specified for the task.
                task_id='trigger-task',
                # Name of task you want to run, used to generate Pod ID.
                name='trigger-name',
                namespace='scheduler',
                in_cluster = True,
     
                cmds=["./docker-run.sh"],
                is_delete_operator_pod=False,
                image='imagerepo:latest',
                image_pull_policy='Always',
                dag=dag)

        print("done")

except Exception as e:
    print(str(e))
    logging.error("Error at {}, error={}".format(__file__, str(e)))
    raise

4

4 回答 4

2

我遇到过同样的问题。我通过在我的配置中添加以下内容来解决它:

AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH: repo/

发生的事情是 init 容器将下载你的 dags,[AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT]/[AIRFLOW__KUBERNETES__GIT_SYNC_DEST]默认AIRFLOW__KUBERNETES__GIT_SYNC_DEST情况下是repohttps://airflow.apache.org/docs/stable/configurations-ref.html#git-sync-dest

于 2020-08-20T03:34:01.470 回答
0

我猜这个问题可能是由于您的设置不同导致的:/opt/airflow/dags/dags/etl/sampledag_dag.pyAIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT: /dags

我会仔细检查这些是否是您想要的,并且是您所期望的。

于 2020-07-03T15:36:04.993 回答
0

在尝试使用稳定的舵气流图使用 Kubernetes Executor 时,我遇到了同样的问题。在我的情况下,我能够通过更改 helm chart 部分AIRFLOW__KUBERNETES__RUN_AS_USER: "50000"来解决它。AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER: "65533"env

此链接中提到了相同的值

我得出了这个结论,因为在临时工作 pod 出现之前正在运行的 init 容器(git sync)无法将 git dag 克隆/同步到工作 pod。就我而言,存在权限错误(即使正确传递了 ssh 克隆的 kube 密码)

笔记:

kubectl get pods -n [NAMESPACE]
kubectl logs -n [NAMESPACE] [POD_ID] -c git-sync
于 2020-08-08T22:22:34.147 回答
0

遇到同样的问题,我通过@gtrip 的建议解决了这个问题,将 git-sync run 用户的 UID 设置为 65533。

我将添加以下调试提示:

kubectl get pods -n [NAMESPACE]
kubectl logs -n [NAMESPACE] [POD_ID] -c git-sync
于 2020-08-26T09:34:49.680 回答