我有一个关于从 Airflow(东京地区的 Apache Airflow 的 Amazon Managed Workflows)调用 AWS lambda(首尔地区)的问题。
问题是当我从 Airflow 调用 Lambda 函数时,Airflow UI 显示该函数失败。
奇怪的是,当我从 AWS 日志中检查这是否属实时,有问题的 Lambda 函数运行良好;状态码 200。
我听说 Airflow 的 Lambda 的超时阈值为 5 分钟。
事实上,其他不到 5 分钟的 Lambda 函数在气流中是成功的。
我的问题是:
- 5分钟限制是真的吗?
- 如果是这样,我可以在某处配置限制吗?
您的帮助将不胜感激
下面是我的代码的一部分。
config = Config(
read_timeout=900,
connect_timeout=900,
retries={"max_attempts": 1}
)
# invoke lambda func
def _invoke_lambda_func(lambda_name,payload):
lambda_hook = AwsLambdaHook(function_name=lambda_name,config=config)
response = lambda_hook.invoke_lambda(payload=payload)
if response['StatusCode'] == 200:
return True
with DAG(
dag_id='lambda_test',
default_args = args,
dagrun_timeout = timedelta(hours=1),
start_date = days_ago(2),
#schedule_interval='0 3 * * *',
schedule_interval=None,
tags=['lambda_test'],
) as dag:
# Start task
for lambda_name in lambda_func_list:
lambda_task = PythonOperator(
task_id=f'{lambda_name}_lambda_func',
python_callable=_invoke_lambda_func,
op_kwargs={
'lambda_name':lambda_name,
'payload':'null'
}
)
lambda_func[lambda_name] = lambda_task
test = PythonOperator(
task_id='final_func',
python_callable=_op_complete
)
lambda_func['extract_message_target_users'] >> test