我正在使用 docker-compose 和 celery 执行器运行 Airflow 2.1.4。到目前为止,我已经能够从 celery 工作容器启动和运行简单的 DockerOperator 任务,但是现在当我尝试将目录从共享驱动器挂载到任务容器时,出现错误(下面的日志文件)。如果我没有定义 mounts 参数,Dag 可以正常工作。所以我猜一些信息或特权没有传递给由 celery worker 容器创建的容器。有什么建议下一步该尝试什么吗?
DAG 文件:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner' : 'airflow',
'description' : 'Use of the DockerOperator',
'depend_on_past' : False,
'start_date' : datetime(2018, 1, 3),
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=5)
}
with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
t1 = BashOperator(
task_id='print_current_date',
bash_command='date'
)
t2 = DockerOperator(
task_id='docker_command',
image='centos:latest',
api_version='auto',
auto_remove=True,
environment={
'AF_EXECUTION_DATE': "{{ ds }}",
'AF_OWNER': "{{ task.owner }}"
},
#command="/bin/date > /airflow/dev/tmp/date.txt",
command="/bin/sleep 5",
docker_url='unix://var/run/docker.sock',
mounts=["/airflow/dev/tmp:/airflow/dev/tmp"],
network_mode='bridge'
)
t3 = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"'
)
t1 >> t2 >> t3
日志文件:
*** Reading local file: /opt/airflow/logs/docker_dag/docker_command/2021-11-03T13:51:12.410942+00:00/1.log
[2021-11-03 13:51:14,663] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2021-11-03T13:51:12.410942+00:00 [queued]>
[2021-11-03 13:51:14,676] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2021-11-03T13:51:12.410942+00:00 [queued]>
[2021-11-03 13:51:14,677] {taskinstance.py:1095} INFO -
--------------------------------------------------------------------------------
[2021-11-03 13:51:14,677] {taskinstance.py:1096} INFO - Starting attempt 1 of 2
[2021-11-03 13:51:14,677] {taskinstance.py:1097} INFO -
--------------------------------------------------------------------------------
[2021-11-03 13:51:14,682] {taskinstance.py:1115} INFO - Executing <Task(DockerOperator): docker_command> on 2021-11-03T13:51:12.410942+00:00
[2021-11-03 13:51:14,686] {standard_task_runner.py:52} INFO - Started process 1174765 to run task
[2021-11-03 13:51:14,688] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'docker_dag', 'docker_command', '2021-11-03T13:51:12.410942+00:00', '--job-id', '8647', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/docker-dag.py', '--cfg-path', '/tmp/tmpbgq__vi1', '--error-file', '/tmp/tmp7yyvqapv']
[2021-11-03 13:51:14,689] {standard_task_runner.py:77} INFO - Job 8647: Subtask docker_command
[2021-11-03 13:51:14,721] {logging_mixin.py:109} INFO - Running <TaskInstance: docker_dag.docker_command 2021-11-03T13:51:12.410942+00:00 [running]> on host my-made-up-host.com
[2021-11-03 13:51:14,760] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=docker_dag
AIRFLOW_CTX_TASK_ID=docker_command
AIRFLOW_CTX_EXECUTION_DATE=2021-11-03T13:51:12.410942+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-11-03T13:51:12.410942+00:00
[2021-11-03 13:51:14,779] {docker.py:246} INFO - Starting docker container from image centos:latest
[2021-11-03 13:51:14,782] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 268, in _raise_for_status
response.raise_for_status()
File "/home/airflow/.local/lib/python3.6/site-packages/requests/models.py", line 953, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: http+docker://localhost/v1.41/containers/create
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 343, in execute
return self._run_image()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 253, in _run_image
return self._run_image_with_mounts(self.mounts + [tmp_mount], add_tmp_variable=True)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 293, in _run_image_with_mounts
tty=self.tty,
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 428, in create_container
return self.create_container_from_config(config, name)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 439, in create_container_from_config
return self._result(res, True)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 274, in _result
self._raise_for_status(response)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 270, in _raise_for_status
raise create_api_error_from_http_exception(e)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation)
docker.errors.APIError: 500 Server Error for http+docker://localhost/v1.41/containers/create: Internal Server Error ("json: cannot unmarshal string into Go struct field HostConfig.HostConfig.Mounts of type mount.Mount")
[2021-11-03 13:51:14,784] {taskinstance.py:1513} INFO - Marking task as UP_FOR_RETRY. dag_id=docker_dag, task_id=docker_command, execution_date=20211103T135112, start_date=20211103T135114, end_date=20211103T135114
[2021-11-03 13:51:14,823] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-11-03 13:51:14,848] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check