1

我一直在使用 AWS Glue 工作流程来编排批处理作业。我们需要通过下推谓词来限制批处理作业的处理。当我们单独运行 Glue 作业时,我们可以在运行时将下推谓词作为命令行参数传递(即 aws glue start-job-run --job-name foo.scala --arguments --arg1-text ${arg1} ..)。但是当我们使用胶水工作流来执行胶水作业时,就有点不清楚了。

当我们使用 AWS Glue 工作流程编排批处理作业时,我们可以在创建工作流程时添加运行属性。

  1. 我可以使用运行属性为我的 Glue Job 传递下推谓词吗?
  2. 如果是,那么我如何在运行时定义运行属性(下推谓词)的值。我想在运行时为下推谓词定义值的原因是因为谓词每天都在任意变化。(即过去 10 天、过去 20 天、过去 2 天等运行胶水工作流程)

我试过了:

aws 胶水启动工作流运行--名称工作流名称 | jq -r '.RunId '

aws 胶水 put-workflow-run-properties --name 工作流名称 --run-id "ID" --run-properties --pushdownpredicate="some value"

我可以看到我使用 put-workflow-run-property 传递的运行属性

aws 胶水放置工作流运行属性 --name 工作流名称 --run-id "ID"

但我无法在我的 Glue Job 中检测到“pushdownpredicate”。知道如何在 Glue Job 中访问工作流的运行属性吗?

在此处输入图像描述

4

2 回答 2

1

如果您使用 python 作为 Glue 作业的编程语言,则可以发出get_workflow_run_properties API 调用来检索属性并在 Glue 作业中使用它。

response = client.get_workflow_run_properties(
    Name='string',
    RunId='string'
)

这将为您提供以下响应,您可以解析和使用它:

{
    'RunProperties': {
        'string': 'string'
    }
}

如果您使用的是 scala,那么您可以使用等效的 AWS 开发工具包。

于 2020-01-03T06:07:49.710 回答
0

首先,您需要确保作业是从工作流中运行的:

def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
    """
    get_worfklow_params is delegated to retrieve the WORKFLOW parameters
    """
    glue_client = boto3.client("glue")
    if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
        workflow_args = glue_client.get_workflow_run_properties(Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
        print("Found the following workflow args: \n{}".format(workflow_args))
        return workflow_args
    print("Unable to find run properties for this workflow!")
    return None

此方法将返回workflow输入参数的映射。

您可以使用以下方法来检索给定参数:

def get_worfklow_param(args: Dict[str, str], arg) -> str:
    """
    get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
    """
    if args is None:
        return None
    return args[arg] if arg in args else None

从重用代码来看,我认为最好创建一个 python ( whl) 模块并将该模块设置在您工作的脚本路径中。通过这种方式,您可以通过简单的导入来检索方法。

如果没有该whl模块,您可以通过以下方式移动:


def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    import boto3
    import sys
    from typing import Dict

    def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
    """
    get_worfklow_params is delegated to retrieve the WORKFLOW parameters
    """
    glue_client = boto3.client("glue")
    if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
        workflow_args = glue_client.get_workflow_run_properties(
            Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
        print("Found the following workflow args: \n{}".format(workflow_args))
        return workflow_args
    print("Unable to find run properties for this workflow!")
    return None

    def get_worfklow_param(args: Dict[str, str], arg) -> str:
    """
    get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
    """
    if args is None:
        return None
    return args[arg] if arg in args else None

    _args = getResolvedOptions(sys.argv, ['JOB_NAME', 'WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
    worfklow_params = get_worfklow_params(_args)


    job_run_id = get_worfklow_param(_args, "WORKFLOW_RUN_ID")
    my_parameter= get_worfklow_param(_args, "WORKFLOW_CUSTOM_PARAMETER")

于 2021-01-21T17:01:00.297 回答