我是气流新手。我正在尝试通过 Airflow 运行容器,但出现超时错误:
[2021-07-21 07:02:06,176] {docker.py:231} INFO - Starting docker container from image python:3.9.2-slim
[2021-07-21 07:03:06,171] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 426, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 421, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.6/http/client.py", line 1379, in getresponse
response.begin()
File "/usr/local/lib/python3.6/http/client.py", line 311, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.6/http/client.py", line 272, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 410, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 735, in reraise
raise value
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 428, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 336, in _raise_timeout
self, url, "Read timed out. (read timeout=%s)" % timeout_value
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='host.docker.internal', port=2375): Read timed out. (read timeout=60)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in execute
return self._run_image()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 258, in _run_image
tty=self.tty,
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 430, in create_container
return self.create_container_from_config(config, name)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 440, in create_container_from_config
res = self._post_json(u, data=config, params=params)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 296, in _post_json
return self._post(url, data=json.dumps(data2), **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/utils/decorators.py", line 46, in inner
return f(self, *args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 233, in _post
return self.post(url, **self._set_request_timeout(kwargs))
File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 590, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 529, in send
raise ReadTimeout(e, request=request)
requests.exceptions.ReadTimeout: HTTPConnectionPool(host='host.docker.internal', port=2375): Read timed out. (read timeout=60)
[2021-07-21 07:03:06,179] {taskinstance.py:1551} INFO - Marking task as UP_FOR_RETRY. dag_id=etl_in_ch, task_id=etl_in_ch, execution_date=20210721T070203, start_date=20210721T070205, end_date=20210721T070306
[2021-07-21 07:03:06,215] {local_task_job.py:149} INFO - Task exited with return code 1
我有 Mac 系统,并配置了 docker 套接字,如下所示:https ://github.com/puckel/docker-airflow/issues/543#issuecomment-741842728
我的气流代码是:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.operators.dummy import DummyOperator
default_args = {
'owner' : 'airflow',
'description' : 'Extract data from different sources into CH and train model with it',
'depend_on_past' : False,
'start_date' : datetime(2021, 7, 19),
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=5)
}
with DAG('etl_in_ch', default_args=default_args, schedule_interval="00 23 * * *", catchup=False) as dag:
start_dag = DummyOperator(
task_id='start_dag'
)
end_dag = DummyOperator(
task_id='end_dag'
)
t1 = DockerOperator(
task_id='etl_in_ch',
image='python:3.9.2-slim',
container_name='etl_in_ch',
api_version='auto',
auto_remove=True,
command="apt-get update && apt-get install -y cron && apt-get install -y libxml2 libxslt-dev wget bzip2 gcc \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir poetry==1.1.5 \
&& poetry config virtualenvs.create false\
&& poetry install --no-interaction --no-ansi \
&& chmod +x /src/__main__.py \
&& python __main__.py",
docker_url="tcp://host.docker.internal:2375",
network_mode="bridge",
environment={"PYTHONDONTWRITEBYTECODE": 1, "PYTHONUNBUFFERED": 1},
working_dir="/usr/src/copy_data",
mounts=['./CH_ETL/src:/usr/src/copy_data', './pyproject.toml:pyproject.toml'],
xcom_all=True
)
start_dag >> t1
t1 >> end_dag
我看到我可能需要增加 docker 超时时间,但我不知道确切的位置,实际上我已经尝试过 - 在我的机器上,在气流工作者内部,在 bobrik/socat docker 内部。没有帮助。