6

我正在尝试为我的 POC 运行简单的 AWS Data Pipeline。我的情况如下:从存储在 S3 上的 CSV 获取数据,对它们执行简单的配置单元查询并将结果放回 S3。

我已经创建了非常基本的管道定义并尝试在不同的 emr 版本上运行它:4.2.0 和 5.3.1 - 尽管在不同的地方都失败了。

所以管道定义如下:

{
  "objects": [
    {
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "maximumRetries": "1",
      "enableDebugging": "true",
      "name": "EmrCluster",
      "keyPair": "Jeff Key Pair",
      "id": "EmrClusterId_CM5Td",
      "releaseLabel": "emr-5.3.1",
      "region": "us-west-2",
      "type": "EmrCluster",
      "terminateAfter": "1 Day"
    },
    {
      "column": [
        "policyID INT",
        "statecode STRING"
      ],
      "name": "SampleCSVOutputFormat",
      "id": "DataFormatId_9sLJ0",
      "type": "CSV"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://aws-logs/datapipeline/",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "directoryPath": "s3://data-pipeline-input/",
      "dataFormat": {
        "ref": "DataFormatId_KIMjx"
      },
      "name": "InputDataNode",
      "id": "DataNodeId_RyNzr",
      "type": "S3DataNode"
    },
    {
      "s3EncryptionType": "NONE",
      "directoryPath": "s3://data-pipeline-output/",
      "dataFormat": {
        "ref": "DataFormatId_9sLJ0"
      },
      "name": "OutputDataNode",
      "id": "DataNodeId_lnwhV",
      "type": "S3DataNode"
    },
    {
      "output": {
        "ref": "DataNodeId_lnwhV"
      },
      "input": {
        "ref": "DataNodeId_RyNzr"
      },
      "stage": "true",
      "maximumRetries": "2",
      "name": "HiveTest",
      "hiveScript": "INSERT OVERWRITE TABLE ${output1} select policyID, statecode from ${input1};",
      "runsOn": {
        "ref": "EmrClusterId_CM5Td"
      },
      "id": "HiveActivityId_JFqr5",
      "type": "HiveActivity"
    },
    {
      "name": "SampleCSVDataFormat",
      "column": [
        "policyID INT",
        "statecode STRING",
        "county STRING",
        "eq_site_limit FLOAT",
        "hu_site_limit FLOAT",
        "fl_site_limit FLOAT",
        "fr_site_limit FLOAT",
        "tiv_2011 FLOAT",
        "tiv_2012 FLOAT",
        "eq_site_deductible FLOAT",
        "hu_site_deductible FLOAT",
        "fl_site_deductible FLOAT",
        "fr_site_deductible FLOAT",
        "point_latitude FLOAT",
        "point_longitude FLOAT",
        "line STRING",
        "construction STRING",
        "point_granularity INT"
      ],
      "id": "DataFormatId_KIMjx",
      "type": "CSV"
    }
  ],
  "parameters": []
}

CSV 文件如下所示:

policyID,statecode,county,eq_site_limit,hu_site_limit,fl_site_limit,fr_site_limit,tiv_2011,tiv_2012,eq_site_deductible,hu_site_deductible,fl_site_deductible,fr_site_deductible,point_latitude,point_longitude,line,construction,point_granularity
119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1
448094,FL,CLAY COUNTY,1322376.3,1322376.3,1322376.3,1322376.3,1322376.3,1438163.57,0,0,0,0,30.063936,-81.707664,Residential,Masonry,3
206893,FL,CLAY COUNTY,190724.4,190724.4,190724.4,190724.4,190724.4,192476.78,0,0,0,0,30.089579,-81.700455,Residential,Wood,1

HiveActivity 只是一个简单的查询(来自 AWS 文档的副本):

"INSERT OVERWRITE TABLE ${output1} select policyID, statecode from ${input1};"

但是在 emr-5.3.1 上运行时失败:

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask
/mnt/taskRunner/./hive-script:617:in `<main>': Error executing cmd: /usr/share/aws/emr/scripts/hive-script "--base-path" "s3://us-west-2.elasticmapreduce/libs/hive/" "--hive-versions" "latest" "--run-hive-script" "--args" "-f"

深入日志,我可以发现以下异常:

2017-02-25T00:33:00,434 ERROR [316e5d21-dfd8-4663-a03c-2ea4bae7b1a0 main([])]: tez.DagUtils (:()) - Could not find the jar that was being uploaded
2017-02-25T00:33:00,434 ERROR [316e5d21-dfd8-4663-a03c-2ea4bae7b1a0 main([])]: exec.Task (:()) - Failed to execute tez graph.
java.io.IOException: Previous writer likely failed to write hdfs://ip-170-41-32-05.us-west-2.compute.internal:8020/tmp/hive/hadoop/_tez_session_dir/31ae6d21-dfd8-4123-a03c-2ea4bae7b1a0/emr-hive-goodies.jar. Failing because I am unlikely to write too.
    at org.apache.hadoop.hive.ql.exec.tez.DagUtils.localizeResource(DagUtils.java:1022)
    at org.apache.hadoop.hive.ql.exec.tez.DagUtils.addTempResources(DagUtils.java:902)
    at org.apache.hadoop.hive.ql.exec.tez.DagUtils.localizeTempFilesFromConf(DagUtils.java:845)
    at org.apache.hadoop.hive.ql.exec.tez.TezSessionState.refreshLocalResourcesFromConf(TezSessionState.java:466)
    at org.apache.hadoop.hive.ql.exec.tez.TezTask.updateSession(TezTask.java:294)
    at org.apache.hadoop.hive.ql.exec.tez.TezTask.execute(TezTask.java:155)

在 emr-4.2.0 上运行时,我又发生了一次崩溃:

Number of reduce tasks is set to 0 since there's no reduce operator
java.lang.NullPointerException
    at org.apache.hadoop.fs.Path.<init>(Path.java:105)
    at org.apache.hadoop.fs.Path.<init>(Path.java:94)
    at org.apache.hadoop.hive.ql.exec.Utilities.toTempPath(Utilities.java:1517)
    at org.apache.hadoop.hive.ql.exec.Utilities.createTmpDirs(Utilities.java:3555)
    at org.apache.hadoop.hive.ql.exec.Utilities.createTmpDirs(Utilities.java:3520)

S3 和 EMR 集群都在同一个区域并在同一个 AWS 账户下运行。我用 S3DataNode 和 EMRCluster 配置尝试了一堆实验,但它总是崩溃。此外,我在 HiveActivity、文档或 github 上都找不到任何数据管道的工作示例。

有人可以帮我弄清楚吗?谢谢你。

4

1 回答 1

1

在将我的 EMR 集群从 4.*.* 版本更新到 5.28.0 版本时,我遇到了同样的问题。更改发布标签后,我关注@andrii-gorishnii 评论并添加

delete jar /mnt/taskRunner/emr-hive-goodies.jar;

到我的 Hive 脚本的开头,它解决了我的问题!谢谢@andrii-gorishnii

于 2020-02-25T19:40:54.990 回答