首先,您需要确保作业是从工作流中运行的:
def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
"""
get_worfklow_params is delegated to retrieve the WORKFLOW parameters
"""
glue_client = boto3.client("glue")
if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
workflow_args = glue_client.get_workflow_run_properties(Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
print("Found the following workflow args: \n{}".format(workflow_args))
return workflow_args
print("Unable to find run properties for this workflow!")
return None
此方法将返回workflow
输入参数的映射。
您可以使用以下方法来检索给定参数:
def get_worfklow_param(args: Dict[str, str], arg) -> str:
"""
get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
"""
if args is None:
return None
return args[arg] if arg in args else None
从重用代码来看,我认为最好创建一个 python ( whl
) 模块并将该模块设置在您工作的脚本路径中。通过这种方式,您可以通过简单的导入来检索方法。
如果没有该whl
模块,您可以通过以下方式移动:
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
import boto3
import sys
from typing import Dict
def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
"""
get_worfklow_params is delegated to retrieve the WORKFLOW parameters
"""
glue_client = boto3.client("glue")
if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
workflow_args = glue_client.get_workflow_run_properties(
Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
print("Found the following workflow args: \n{}".format(workflow_args))
return workflow_args
print("Unable to find run properties for this workflow!")
return None
def get_worfklow_param(args: Dict[str, str], arg) -> str:
"""
get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
"""
if args is None:
return None
return args[arg] if arg in args else None
_args = getResolvedOptions(sys.argv, ['JOB_NAME', 'WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
worfklow_params = get_worfklow_params(_args)
job_run_id = get_worfklow_param(_args, "WORKFLOW_RUN_ID")
my_parameter= get_worfklow_param(_args, "WORKFLOW_CUSTOM_PARAMETER")