4

我正在尝试通过 Java SDK 运行 EMR 作业。

但它根本没有启动。

我正在粘贴我正在使用的代码。

我还查看了文档。但这并没有太大帮助。

     package com.zedo.aws.emr;

     import com.amazonaws.auth.AWSCredentials;
     import com.amazonaws.auth.BasicAWSCredentials;
     import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
     import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
     import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
     import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
     import com.amazonaws.services.elasticmapreduce.model.StepConfig;
     import com.amazonaws.services.elasticmapreduce.util.StepFactory;

public class ExampleEMR {

    /**
     * @param args
     */
    public static void main(String[] args) {

        AWSCredentials credentials = new BasicAWSCredentials("<my key>", "<my secret key>");
        AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);

        StepFactory stepFactory = new StepFactory();

        StepConfig enableDebugging = new StepConfig()
            .withName("Enable Debugging")
            .withActionOnFailure("TERMINATE_JOB_FLOW")
            .withHadoopJarStep(stepFactory.newEnableDebuggingStep());

        StepConfig installHive = new StepConfig()
            .withName("Install Hive")
            .withActionOnFailure("TERMINATE_JOB_FLOW")
            .withHadoopJarStep(stepFactory.newInstallHiveStep());

        StepConfig hiveScript = new StepConfig().withName("Hive Script")
            .withActionOnFailure("TERMINATE_JOB_FLOW")
            .withHadoopJarStep(stepFactory.newRunHiveScriptStep("s3://<path to script>"));

        RunJobFlowRequest request = new RunJobFlowRequest()
            .withName("Hive Interactive")
            .withSteps(enableDebugging, installHive)
            .withLogUri("s3://myawsbucket/")
            .withInstances(new JobFlowInstancesConfig()
                .withEc2KeyName("<my key>")
                .withHadoopVersion("0.20")
                .withInstanceCount(5)
                .withKeepJobFlowAliveWhenNoSteps(true)
                .withMasterInstanceType("m1.small")
                .withSlaveInstanceType("m1.small"));

        RunJobFlowResult result = emr.runJobFlow(request);

    }

}

或者有人可以指出一些示例链接吗?

4

3 回答 3

5

这对我有用:

public void runScriptClientes(Calendar executionDate) {

    // creacion credecencial s3
    BasicAWSCredentials awsCreds = new BasicAWSCredentials(rb.getString("awsAccessKey"),
            rb.getString("awsSecretKey"));

    // creacion cliente para conectarse s3
    AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(awsCreds);
    emr.setRegion(Region.getRegion(Regions.EU_WEST_1));

    // calculo de las carpeta a procesar
    Map<String, FolderS3> s3DataToProcessInput = getRutasInput(executionDate);
    //Map<String, Boolean> s3DataToProcessOut = getRutaInput();

    for (Entry<String, FolderS3> bucket_ : s3DataToProcessInput.entrySet()){
        String nameBucket = bucket_.getKey();
        FolderS3 folderS3 = bucket_.getValue();
        // verificar existencia bucket
        if(folderS3.getExistInBucket()){
            listaConcurrente.add(folderS3);
            StepFactory stepFactory = new StepFactory();

            StepConfig stepHive = new StepConfig()
                    .withName(rb.getString("nameStepClientesS3")+":"+nameBucket)/*nombre del step a ejecutar*/
                    .withActionOnFailure(ActionOnFailure.CONTINUE) /*accion a seguir si el step falla*/
                    .withHadoopJarStep(
                            stepFactory.newRunHiveScriptStep(rb.getString("scriptClienteS3"), 
                                    "-d", "s3DataToProcess=s3://"+rb.getString("bucketPropio")+"/"+rb.getString("ruta_input_c1")+folderS3.getNameKey(),
                                    "-d", "s3DataToProcessOut=s3://"+rb.getString("bucketPropioOUT")+"/"+rb.getString("ruta_output_c1")+folderS3.getOutputFolder(),
                                    "-d", "windowTime=tablaparametro"));

            AddJobFlowStepsRequest jobFlow = new AddJobFlowStepsRequest().withJobFlowId(rb.getString("jobflowID"))
                    .withSteps(stepHive);

            //mientras el estado sea pending o running
            AddJobFlowStepsResult result = emr.addJobFlowSteps(jobFlow);
            List<String> id = result.getStepIds();
            DescribeStepRequest describe = new DescribeStepRequest().withStepId(id.get(0));
            describe.setClusterId(rb.getString("jobflowID"));
            describe.setRequestCredentials(awsCreds); 
            DescribeStepResult res = emr.describeStep(describe);
            StepStatus status = res.getStep().getStatus();
            String stas = status.getState();

            while (stas.equals(StepExecutionState.PENDING.name()) || stas.equals(StepExecutionState.RUNNING.name())){
                try {
                    Thread.sleep(5000);
                    res = emr.describeStep(describe);
                    status = res.getStep().getStatus();
                    stas = status.getState();
                    log.info(stas);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            if (stas.equals(StepExecutionState.COMPLETED.name())) {
                folderS3.setProcessedInput(Boolean.TRUE);
                listaConcurrente.remove(folderS3);
                log.info("Step finalizado ok : "+folderS3 );
            }else if(stas.equals(StepExecutionState.FAILED.name()) || stas.equals(StepExecutionState.CANCELLED.name())){
                listaConcurrente.remove(folderS3);
                folderS3.setProcessedInput(Boolean.FALSE);
                listaConcurrente.add(folderS3);
                log.info("Step Fallo o fue Cancelado : "+folderS3 );
            }

            // leer datos del resultado y cargar en BBDD

        }
    }
}
于 2014-09-25T16:19:44.590 回答
0

以下是您可以参考的链接,

http://mpouttuclarke.wordpress.com/2011/06/24/how-to-run-an-elastic-mapreduce-job-using-the-java-sdk/

注意:上述方法中使用的一些方法已弃用。有关更新版本,请参阅 aws参考指南。

于 2013-03-12T19:11:35.443 回答
-1

我已经通过更正最后的密钥来解决这个问题。

于 2013-03-12T12:21:47.883 回答