1

版本

  • 气流:1.10.7
  • Kubernetes:1.14.9

设置

Airflow 配置为使用 Kubernetes Executors;正常操作工作得很好;

PersistentVolume通过使用& PersistentVolumeClaimspecs定义的 EFS 卷访问 Dags 和日志;

我有以下 k8s 规范,我想用它来运行回填作业;

apiVersion: v1
kind: Pod
metadata:
  name: backfill-test
  namespace: airflow
spec:
  serviceAccountName: airflow-service-account
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: airflow-dags
    - name: airflow-logs
      persistentVolumeClaim:
        claimName: airflow-logs
  containers:
  - name: somename
    image: myimage
    volumeMounts:
    - name: airflow-dags
      mountPath: /usr/local/airflow/dags
      readOnly: true
    - name: airflow-logs
      mountPath: /usr/local/airflow/logs
      readOnly: false
    env:
    - name: AIRFLOW__CORE__EXECUTOR
      value: KubernetesExecutor
    - name: AIRFLOW__KUBERNETES__NAMESPACE
      value: airflow
    - name: AIRFLOW__CORE__DAGS_FOLDER
      value: dags
    - name: AIRFLOW__CORE__BASE_LOG_FOLDER
      value: logs
    # - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_MOUNT_POINT
    #   value: /usr/local/airflow/dags
    - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH
      value: dags
    - name: AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH
      value: logs
    - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
      value: airflow-dags
    - name: AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM
      value: airflow-logs
    - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
      value: someimage_uri
    - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
      value: latest
    - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
      value: airflow-service-account
    - name: AIRFLOW_HOME
      value: usr/local/airflow

    # command: ["sleep", "1h"]


    command: ["airflow", "backfill", 
              "my_dag",
    #           # "--subdir", ".",
    #           "--local",
              "--task_regex", "my_task_task",
              "--start_date", "2020-07-01T00:00:00",
              "--end_date", "2020-08-01T00:00:00"]
  restartPolicy: Never

问题

当任务添加到队列时,这个问题似乎是一些路径问题

运行初始命令时,cli 会找到 dag 和关联的任务;

airflow@backfill-test:~$ airflow run my_dag my_task 2020-07-01T01:15:00+00:00 --local --raw --force
[2020-08-27 23:14:42,038] {__init__.py:51} INFO - Using executor KubernetesExecutor
[2020-08-27 23:14:42,040] {dagbag.py:403} INFO - Filling up the DagBag from /usr/local/airflow/dags
Running %s on host %s <TaskInstance: my_dag.my_task 2020-07-01T01:15:00+00:00 [failed]> backfill-test

但是,任务会以一些奇怪的路径添加到队列中。下面是实际任务执行尝试的日志。

[2020-08-27 23:14:43,019] {taskinstance.py:867} INFO - Starting attempt 3 of 2
[2020-08-27 23:14:43,019] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-08-27 23:14:43,043] {taskinstance.py:887} INFO - Executing <Task(PostgresOperator): my_task> on 2020-07-01T01:15:00+00:00
[2020-08-27 23:14:43,046] {standard_task_runner.py:52} INFO - Started process 191 to run task
[2020-08-27 23:14:43,085] {logging_mixin.py:112} INFO - [2020-08-27 23:14:43,085] {dagbag.py:403} INFO - Filling up the DagBag from /usr/local/airflow/dags/usr/local/airflow/my_dag.py
[2020-08-27 23:14:53,006] {logging_mixin.py:112} INFO - [2020-08-27 23:14:53,006] {local_task_job.py:103} INFO - Task exited with return code 1

添加--subdir到初始命令实际上不会传播到任务队列,并且会产生相同的日志输出。

4

0 回答 0