版本
- 气流:1.10.7
- Kubernetes:1.14.9
设置
Airflow 配置为使用 Kubernetes Executors;正常操作工作得很好;
PersistentVolume通过使用& PersistentVolumeClaimspecs定义的 EFS 卷访问 Dags 和日志;
我有以下 k8s 规范,我想用它来运行回填作业;
apiVersion: v1
kind: Pod
metadata:
name: backfill-test
namespace: airflow
spec:
serviceAccountName: airflow-service-account
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
containers:
- name: somename
image: myimage
volumeMounts:
- name: airflow-dags
mountPath: /usr/local/airflow/dags
readOnly: true
- name: airflow-logs
mountPath: /usr/local/airflow/logs
readOnly: false
env:
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow
- name: AIRFLOW__CORE__DAGS_FOLDER
value: dags
- name: AIRFLOW__CORE__BASE_LOG_FOLDER
value: logs
# - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_MOUNT_POINT
# value: /usr/local/airflow/dags
- name: AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH
value: dags
- name: AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH
value: logs
- name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
value: airflow-dags
- name: AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM
value: airflow-logs
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: someimage_uri
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: latest
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: airflow-service-account
- name: AIRFLOW_HOME
value: usr/local/airflow
# command: ["sleep", "1h"]
command: ["airflow", "backfill",
"my_dag",
# # "--subdir", ".",
# "--local",
"--task_regex", "my_task_task",
"--start_date", "2020-07-01T00:00:00",
"--end_date", "2020-08-01T00:00:00"]
restartPolicy: Never
问题
当任务添加到队列时,这个问题似乎是一些路径问题
运行初始命令时,cli 会找到 dag 和关联的任务;
airflow@backfill-test:~$ airflow run my_dag my_task 2020-07-01T01:15:00+00:00 --local --raw --force
[2020-08-27 23:14:42,038] {__init__.py:51} INFO - Using executor KubernetesExecutor
[2020-08-27 23:14:42,040] {dagbag.py:403} INFO - Filling up the DagBag from /usr/local/airflow/dags
Running %s on host %s <TaskInstance: my_dag.my_task 2020-07-01T01:15:00+00:00 [failed]> backfill-test
但是,任务会以一些奇怪的路径添加到队列中。下面是实际任务执行尝试的日志。
[2020-08-27 23:14:43,019] {taskinstance.py:867} INFO - Starting attempt 3 of 2
[2020-08-27 23:14:43,019] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-08-27 23:14:43,043] {taskinstance.py:887} INFO - Executing <Task(PostgresOperator): my_task> on 2020-07-01T01:15:00+00:00
[2020-08-27 23:14:43,046] {standard_task_runner.py:52} INFO - Started process 191 to run task
[2020-08-27 23:14:43,085] {logging_mixin.py:112} INFO - [2020-08-27 23:14:43,085] {dagbag.py:403} INFO - Filling up the DagBag from /usr/local/airflow/dags/usr/local/airflow/my_dag.py
[2020-08-27 23:14:53,006] {logging_mixin.py:112} INFO - [2020-08-27 23:14:53,006] {local_task_job.py:103} INFO - Task exited with return code 1
添加--subdir到初始命令实际上不会传播到任务队列,并且会产生相同的日志输出。