1

使用 AWS MWAA 我正在尝试运行任务。有时,可能在 5% 的任务中,这些任务会失败,没有日志,也没有证据表明它曾经向工作人员提交过。(编辑:注意到同时发送两个任务时会发生这种情况)

在检查调度程序日志时,我发现 boto(我不在 DAG 中使用)引发了以下 Celery 错误:

[[34m2021-08-04 13:50:41,853[0m] {{[34mscheduler_job.py:[0m1109}} INFO[0m - Sending TaskInstanceKey(dag_id='XXX', task_id='XXXX', execution_date=datetime.datetime(2021, 8, 4, 13, 40, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue airflow-celery-abc123[0m

...


[[34m2021-08-04 13:50:41,854[0m] {{[34mbase_executor.py:[0m82}} INFO[0m - Adding to queue: ['airflow', 'tasks', 'run', 'xxxx', 'xxxxx', '2021-08-04T13:40:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/xxxx'][0m

...

[[34m2021-08-04 13:50:42,825[0m] {{[34mcelery_executor.py:[0m295}} ERROR[0m - Error sending Celery task: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:

2021-08-04T13:50:43.428Z    b''

2021-08-04T13:50:43.452Z
Celery Task ID: TaskInstanceKey(dag_id='XXX', task_id='xxxxx', execution_date=datetime.datetime(2021, 8, 4, 13, 40, tzinfo=Timezone('UTC')), try_number=1)

2021-08-04T13:50:43.479Z    Traceback (most recent call last):

2021-08-04T13:50:43.510Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 452, in _parse_xml_string_to_dom

2021-08-04T13:50:43.540Z    root = parser.close()

2021-08-04T13:50:43.890Z    File "<string>", line None

2021-08-04T13:50:43.923Z    xml.etree.ElementTree.ParseError: no element found: line 1, column 0

2021-08-04T13:50:43.950Z    

2021-08-04T13:50:43.989Z    During handling of the above exception, another exception occurred:

2021-08-04T13:50:44.020Z    

2021-08-04T13:50:44.054Z    Traceback (most recent call last):

2021-08-04T13:50:44.084Z    File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 167, in send_task_to_executor

2021-08-04T13:50:44.117Z    result = task_to_run.apply_async(args=[command], queue=queue)

2021-08-04T13:50:44.150Z    File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 570, in apply_async

2021-08-04T13:50:44.178Z    **options

2021-08-04T13:50:44.207Z    File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 741, in send_task

2021-08-04T13:50:44.366Z    amqp.send_task_message(P, name, message, **options)

2021-08-04T13:50:44.393Z    File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 561, in send_task_message

2021-08-04T13:50:44.427Z    **properties

2021-08-04T13:50:44.457Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 181, in publish

2021-08-04T13:50:44.485Z    exchange_name, declare,

2021-08-04T13:50:44.519Z    File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 533, in _ensured

2021-08-04T13:50:44.542Z    return fun(*args, **kwargs)

2021-08-04T13:50:44.567Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in _publish

2021-08-04T13:50:44.593Z    [maybe_declare(entity) for entity in declare]

2021-08-04T13:50:44.686Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 194, in <listcomp>

2021-08-04T13:50:44.716Z    [maybe_declare(entity) for entity in declare]

2021-08-04T13:50:44.892Z    File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 102, in maybe_declare

2021-08-04T13:50:44.929Z    return maybe_declare(entity, self.channel, retry, **retry_policy)

2021-08-04T13:50:44.963Z    File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 121, in maybe_declare

2021-08-04T13:50:44.994Z    return _maybe_declare(entity, channel)

2021-08-04T13:50:45.027Z    File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 161, in _maybe_declare

2021-08-04T13:50:45.079Z    entity.declare(channel=channel)

2021-08-04T13:50:45.108Z    File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 611, in declare

2021-08-04T13:50:45.138Z    self._create_queue(nowait=nowait, channel=channel)

2021-08-04T13:50:45.167Z    File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 620, in _create_queue

2021-08-04T13:50:45.195Z    self.queue_declare(nowait=nowait, passive=False, channel=channel)

2021-08-04T13:50:45.232Z    File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 655, in queue_declare

2021-08-04T13:50:45.610Z    nowait=nowait,

2021-08-04T13:50:45.639Z    File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 532, in queue_declare

2021-08-04T13:50:45.675Z    return queue_declare_ok_t(queue, self._size(queue), 0)

2021-08-04T13:50:45.712Z    File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 484, in _size

2021-08-04T13:50:45.741Z    AttributeNames=['ApproximateNumberOfMessages'])

2021-08-04T13:50:45.773Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call

2021-08-04T13:50:45.802Z    return self._make_api_call(operation_name, kwargs)

2021-08-04T13:50:45.826Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 663, in _make_api_call

2021-08-04T13:50:45.859Z    operation_model, request_dict, request_context)

2021-08-04T13:50:45.886Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 682, in _make_request

2021-08-04T13:50:45.911Z    return self._endpoint.make_request(operation_model, request_dict)

2021-08-04T13:50:46.066Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 102, in make_request

2021-08-04T13:50:46.095Z    return self._send_request(request_dict, operation_model)

2021-08-04T13:50:46.122Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 135, in _send_request

2021-08-04T13:50:46.151Z    request, operation_model, context)

2021-08-04T13:50:46.178Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 167, in _get_response

2021-08-04T13:50:46.204Z    request, operation_model)

2021-08-04T13:50:46.233Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 218, in _do_get_response

2021-08-04T13:50:46.259Z    response_dict, operation_model.output_shape)

2021-08-04T13:50:46.286Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 245, in parse

2021-08-04T13:50:46.315Z    parsed = self._do_parse(response, shape)

2021-08-04T13:50:46.346Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 523, in _do_parse

2021-08-04T13:50:47.279Z    return self._parse_body_as_xml(response, shape, inject_metadata=True)

2021-08-04T13:50:47.313Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 527, in _parse_body_as_xml

2021-08-04T13:50:47.344Z    root = self._parse_xml_string_to_dom(xml_contents)

2021-08-04T13:50:47.377Z    File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/parsers.py", line 457, in _parse_xml_string_to_dom

2021-08-04T13:50:47.410Z    (e, xml_string))

2021-08-04T13:50:47.440Z    botocore.parsers.ResponseParserError: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:

2021-08-04T13:50:47.468Z    b''


...


[[34m2021-08-04 13:50:42,869[0m] {{[34mscheduler_job.py:[0m1239}} ERROR[0m - Executor reports task instance <TaskInstance: xxx.xxxx 2021-08-04 13:40:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?[0m

检查这篇文章似乎调度程序共享 boto 会话可能存在一些问题?有人对如何解决这个问题有任何提示吗?

可能这个帖子这个帖子都是相关的

4

1 回答 1

0

所以在尝试了各种事情之后,它似乎真的是一个并发问题,解决它的最简单方法是让事情不并发。

因此,使用只有 2 个 vcpus 的调度程序(根据this)运行他们最小的实例大小不会有这个问题。

另一件事是设置celery.sync_parallelism = 1

于 2021-08-04T17:25:25.307 回答