4

我正在使用 Java Sdk 创建胶水作业。它只有两个必需的参数 Command 和 Glue 版本。但我需要使用自动脚本生成来创建工作。正如我们可以从控制台做的那样,我们添加数据源、AWS Glue 生成的建议脚本、转换类型、数据目标、架构 n 全部。如何使用 java sdk 甚至使用 aws 胶水 api 将这些参数添加到胶水作业中。

           CreateJobRequest req = new CreateJobRequest();
            req.setName("TestJob2");
            req.setRole("GlueS3Role");
            req.setGlueVersion("1.0");
            JobCommand command = new JobCommand();
            command.setName("glueetl");
            command.setPythonVersion("3");
            **// S3 location need not to be given, as script code is auto generated by AWS glue
           command.setScriptLocation(S3ScriptLocation);**
            req.setCommand(command);

            AWSGlue glueClient = AWSGlueClientBuilder.standard()
                                 .withRegion(Regions.US_EAST_1)
                                 .withCredentials(new AWSStaticCredentialsProvider(creds))
                                 .build();

            glueClient.createJob(req);
        }
4

2 回答 2

1

我希望 AWS Glue 客户端和触发作业的逻辑的实施将有助于以同样的方式实现胶水作业的自动生成。

胶客户:

public GlueClient createClient() {
        return GlueClient.builder()
                .region(Region.of(regionName))
                .credentialsProvider(ProfileCredentialsProvider.create(profileName)).build();

胶水工作赛跑者:

public static String runGlueJob(GlueClient glueClient, String jobName, Map<String, String> glueArguments) {

        StartJobRunResponse response = glueClient.startJobRun(StartJobRunRequest.builder().jobName(jobName).arguments(glueArguments).build());
        String jobId = response.jobRunId();
        logger.info("JobId: " + jobId);

        return jobId;

    }

要创建新的 AWS Glue 作业定义,我们可以执行以下操作:

CreateJobResult jobResult = glueClient.createJob(CreateJobRequest.builder()
                .command(JobCommand.builder().pythonVersion("").scriptLocation("").name("").build())
                .defaultArguments()
                .description()
                .glueVersion()
                .logUri()
                .name()
                .numberOfWorkers()
                .role()
                .tags()
                .build());            

然后触发工作

https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/glue/AWSGlueClient.html

于 2020-05-13T22:19:23.100 回答
0

您正在寻找的是createScript(CreateScriptRequest request)- AWSJavaSDK AWSGlueClient类中的一个函数

不幸的是,当前版本的 AWS Glue SDK 不包含用于生成 ETL 脚本的简单功能。在创建作业功能中生成 ETL 脚本时,AWS Glue 控制台本身会在后台执行多项操作(您可以通过查看浏览器的网络选项卡来查看这一点)。

通过使用“DAG”来模仿它

您将需要收集CodeGenNode&CodeGenEdge并将它们添加到您的CreateScriptRequestwith

.WithDagNodes(Collection<CodeGenNode> collection)

&

.WithDagEdges(Collection<CodeGenEdge> collection)

我建议您首先在 AWS 控制台中生成一个 ETL 脚本,并在“生成 Scala 代码”示例中交叉引用该脚本(此链接是为了让您更好地理解“DAG”)

我最终明确地构建了这个 DAG 结构。这是我的解决方案的片段:

    var dagNodes = new ArrayList<CodeGenNode>();
    var dagEdges = new ArrayList<CodeGenEdge>();

    //datasource
    dagEdges.add(new CodeGenEdge().withSource(dataSourceName).withTarget(applyMappingName));
    ArrayList<CodeGenNodeArg> dataSourceArgs = new ArrayList<CodeGenNodeArg>();
    dataSourceArgs.add(new CodeGenNodeArg().withName("database").withValue(String.format("\"%s\"", databaseName)));
    dataSourceArgs.add(new CodeGenNodeArg().withName("table_name").withValue(String.format("\"%s\"", tableName)));
    dataSourceArgs.add(new CodeGenNodeArg().withName("transformation_ctx").withValue(String.format("\"%s\"", dataSourceName)));
    dagNodes.add(new CodeGenNode().withId(dataSourceName).withNodeType("DataSource").withArgs(dataSourceArgs));

... //can build out many 'operations' - datasource, applymapping, selectfields, resolvechoice, datasink

    var createScriptRequest = new CreateScriptRequest()
        .withDagEdges(dagEdges)
        .withDagNodes(dagNodes)
        .withLanguage(Language.PYTHON);

    awsGlueClient.createScript(createScriptRequest)

然后只需使用AmazonS3将此结果上传到 S3并将此路径用于“setScriptLocation”

PutObjectResult putObject(String bucketName, String key, String content)

于 2020-07-14T23:09:41.667 回答