1

我正在尝试设置从 .bag 文件中提取图像的 Airflow ETL 管道。我想在 docker 中提取它,我正在使用 DockerOperator。Docker 镜像是从私有 GitLab 存储库中提取的。我要运行的脚本是 Docker 容器内的 python 脚本。.bag 文件在我的外部 SSD 上,所以我试图将它安装在 docker 中。代码有问题还是另一种问题?

错误:

[2021-09-16 10:39:17,010] {docker.py:246} INFO - Starting docker container from image registry.gitlab.com/url/of/gitlab:a24a3f05
[2021-09-16 10:39:17,010] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 343, in execute
    return self._run_image()
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 265, in _run_image
    return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 287, in _run_image_with_mounts
    privileged=self.privileged,
  File "/usr/lib/python3/dist-packages/docker/api/container.py", line 607, in create_host_config
    return HostConfig(*args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'mounts'
[2021-09-16 10:39:17,014] {taskinstance.py:1512} INFO - Marking task as FAILED. dag_id=ETL-test, task_id=docker_extract, execution_date=20210916T083912, start_date=20210916T083915, end_date=20210916T083917
[2021-09-16 10:39:17,062] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-16 10:39:17,085] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

这是我的代码:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from airflow.operators.bash_operator import BashOperator

ssd_dir=Mount(source='/media/filip/external-ssd', target='/external-ssd', type='bind')

dag = DAG(
    'ETL-test',
    default_args = {
    'owner'                 : 'admin',
    'description'           : 'Extract data from bag, simple test',
    'depend_on_past'        : False,
    'start_date'            : datetime(2021, 9, 13),
    },
)

start_dag = DummyOperator(
task_id='start_dag',
dag=dag
)

extract = DockerOperator(
api_version="auto",
task_id='docker_extract',
image='registry.gitlab.com/url/of/gitlab:a24a3f05',
container_name='extract-test',
mounts=[ssd_dir],
auto_remove = True,
force_pull = False,
mount_tmp_dir=False,
command='python3 rgb_image_extraction.py --bagfile /external-ssd/2021-09-01-13-17-10.bag --output_dir /external-ssd/airflow --camera_topic /kirby1/vm0/stereo/left/color/image_rect --every_n_img 20 --timestamp_as_name',
docker_conn_id='gitlab_registry',
dag=dag
)

test = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"',
dag=dag
)

start_dag >> extract >> test 
4

1 回答 1

0

我认为您安装了一个旧的 docker python 库。如果您想确保气流 2.1.0 正常工作,您应该始终使用https://airflow.apache.org/docs/apache-airflow/stable/installation.html中描述的约束机制,否则您将面临依赖过时的风险.

例如,如果您使用 Python 3.6,则正确的约束是https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.6.txt 并且 docker python 库是 5.0.0 我敢打赌你有更旧的版本。

于 2021-09-16T20:06:38.913 回答