0

我有一个关于从 Airflow(东京地区的 Apache Airflow 的 Amazon Managed Workflows)调用 AWS lambda(首尔地区)的问题。

问题是当我从 Airflow 调用 Lambda 函数时,Airflow UI 显示该函数失败。

奇怪的是,当我从 AWS 日志中检查这是否属实时,有问题的 Lambda 函数运行良好;状态码 200。

我听说 Airflow 的 Lambda 的超时阈值为 5 分钟。

事实上,其他不到 5 分钟的 Lambda 函数在气流中是成功的。

我的问题是:

  1. 5分钟限制是真的吗?
  2. 如果是这样,我可以在某处配置限制吗?

您的帮助将不胜感激

下面是我的代码的一部分。

config = Config(
    read_timeout=900,
    connect_timeout=900,
    retries={"max_attempts": 1}
)
# invoke lambda func
def _invoke_lambda_func(lambda_name,payload):
    lambda_hook = AwsLambdaHook(function_name=lambda_name,config=config)
    response = lambda_hook.invoke_lambda(payload=payload)
    if response['StatusCode'] == 200:
        return True

with DAG(
dag_id='lambda_test',
    default_args = args,
    dagrun_timeout = timedelta(hours=1),
    start_date = days_ago(2),
    #schedule_interval='0 3 * * *',
    schedule_interval=None,
    tags=['lambda_test'],
) as dag:
    # Start task
    for lambda_name in lambda_func_list:
        lambda_task = PythonOperator(
            task_id=f'{lambda_name}_lambda_func',
            python_callable=_invoke_lambda_func,
            op_kwargs={
                'lambda_name':lambda_name,
                'payload':'null'
            }
        )
        lambda_func[lambda_name] = lambda_task
    test = PythonOperator(
        task_id='final_func',
        python_callable=_op_complete
    )
    lambda_func['extract_message_target_users'] >> test
4

0 回答 0