我正在尝试设置从 .bag 文件中提取图像的 Airflow ETL 管道。我想在 docker 中提取它,我正在使用 DockerOperator。Docker 镜像是从私有 GitLab 存储库中提取的。我要运行的脚本是 Docker 容器内的 python 脚本。.bag 文件在我的外部 SSD 上,所以我试图将它安装在 docker 中。代码有问题还是另一种问题?
错误:
[2021-09-16 10:39:17,010] {docker.py:246} INFO - Starting docker container from image registry.gitlab.com/url/of/gitlab:a24a3f05
[2021-09-16 10:39:17,010] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 343, in execute
return self._run_image()
File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 265, in _run_image
return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 287, in _run_image_with_mounts
privileged=self.privileged,
File "/usr/lib/python3/dist-packages/docker/api/container.py", line 607, in create_host_config
return HostConfig(*args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'mounts'
[2021-09-16 10:39:17,014] {taskinstance.py:1512} INFO - Marking task as FAILED. dag_id=ETL-test, task_id=docker_extract, execution_date=20210916T083912, start_date=20210916T083915, end_date=20210916T083917
[2021-09-16 10:39:17,062] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-16 10:39:17,085] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check
这是我的代码:
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from airflow.operators.bash_operator import BashOperator
ssd_dir=Mount(source='/media/filip/external-ssd', target='/external-ssd', type='bind')
dag = DAG(
'ETL-test',
default_args = {
'owner' : 'admin',
'description' : 'Extract data from bag, simple test',
'depend_on_past' : False,
'start_date' : datetime(2021, 9, 13),
},
)
start_dag = DummyOperator(
task_id='start_dag',
dag=dag
)
extract = DockerOperator(
api_version="auto",
task_id='docker_extract',
image='registry.gitlab.com/url/of/gitlab:a24a3f05',
container_name='extract-test',
mounts=[ssd_dir],
auto_remove = True,
force_pull = False,
mount_tmp_dir=False,
command='python3 rgb_image_extraction.py --bagfile /external-ssd/2021-09-01-13-17-10.bag --output_dir /external-ssd/airflow --camera_topic /kirby1/vm0/stereo/left/color/image_rect --every_n_img 20 --timestamp_as_name',
docker_conn_id='gitlab_registry',
dag=dag
)
test = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"',
dag=dag
)
start_dag >> extract >> test