9

我正在尝试使用 AWS 管道将 CSV 数据从 S3 存储桶传输到 DynamoDB,以下是我的管道脚本,它无法正常工作,

CSV 文件结构

Name, Designation,Company

A,TL,C1

B,Prog, C2

DynamoDb : N_Table,名称为哈希值

{
"objects": [
    {
        "id": "Default",
        "scheduleType": "cron",
        "name": "Default",
        "role": "DataPipelineDefaultRole",
        "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
        "id": "DynamoDBDataNodeId635",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "tableName": "N_Table",
        "name": "MyDynamoDBData",
        "type": "DynamoDBDataNode"
    },
    {
        "emrLogUri": "s3://onlycsv/error",
        "id": "EmrClusterId636",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "masterInstanceType": "m1.small",
        "coreInstanceType": "m1.xlarge",
        "enableDebugging": "true",
        "installHive": "latest",
        "name": "ImportCluster",
        "coreInstanceCount": "1",
        "logUri": "s3://onlycsv/error1",
        "type": "EmrCluster"
    },
    {
        "id": "S3DataNodeId643",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "directoryPath": "s3://onlycsv/data.csv",
        "name": "MyS3Data",
        "dataFormat": {
            "ref": "DataFormatId1"
        },
        "type": "S3DataNode"
    },
    {
        "id": "ScheduleId639",
        "startDateTime": "2013-08-03T00:00:00",
        "name": "ImportSchedule",
        "period": "1 Hours",
        "type": "Schedule",
        "endDateTime": "2013-08-04T00:00:00"
    },
    {
        "id": "EmrActivityId637",
        "input": {
            "ref": "S3DataNodeId643"
        },
        "schedule": {
            "ref": "ScheduleId639"
        },
        "name": "MyImportJob",
        "runsOn": {
            "ref": "EmrClusterId636"
        },
        "maximumRetries": "0",
        "myDynamoDBWriteThroughputRatio": "0.25",
        "attemptTimeout": "24 hours",
        "type": "EmrActivity",
        "output": {
            "ref": "DynamoDBDataNodeId635"
        },
        "step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
    },
    {
        "id": "DataFormatId1",
        "name": "DefaultDataFormat1",
        "column": [
            "Name",
            "Designation",
            "Company"
        ],
        "columnSeparator": ",",
        "recordSeparator": "\n",
        "type": "Custom"
    }
]

}

在执行管道的四个步骤中,有两个正在完成,但没有完全执行

4

4 回答 4

5

目前 (2015-04) 默认导入管道模板不支持导入 CSV 文件。

如果您的 CSV 文件不是太大(小于 1GB 左右),您可以创建一个 ShellCommandActivity 以首先将 CSV 转换为 DynamoDB JSON 格式,然后将其提供给 EmrActivity 以将生成的 JSON 文件导入您的表中。

作为第一步,您可以创建示例 DynamoDB 表,包括您需要的所有字段类型,填充虚拟值,然后使用管道导出记录(DynamoDB 控制台中的导出/导入按钮)。这将使您了解导入管道所期望的格式。类型名称不明显,并且 Import 活动对正确的大小写非常敏感(例如,您应该为布尔字段设置 bOOL)。

之后应该很容易创建一个 awk 脚本(或任何其他文本转换器,至少使用 awk,您可以将默认 AMI 图像用于您的 shell 活动),您可以将其提供给您的 shellCommandActivity。不要忘记启用“暂存”标志,以便将您的输出上传回 S3 以供 Import 活动拾取。

于 2015-04-11T20:05:46.170 回答
3

如果您使用模板数据管道将数据从 S3 导入 DynamoDB,这些数据格式将不起作用。相反,使用下面链接中的格式来存储输入 S3 数据文件http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-pipelinejson-verifydata2.html

此格式的输出文件由将数据从 DynamoDB 导出到 S3 的模板数据管道生成。

希望有帮助。

于 2014-05-27T16:03:59.713 回答
0

我建议使用 datapipeline 提供的 CSV 数据格式,而不是自定义。

要调试集群上的错误,您可以在 EMR 控制台中查找作业流并查看失败任务的日志文件。

于 2013-11-15T21:29:42.353 回答
0

请参阅下面的链接以获取有效的解决方案(在问题部分中),尽管是 EMR 3.x。只需将分隔符更改为"columnSeparator": ",". 就个人而言,除非您确定数据已正确清理,否则我不会使用 CSV。

如何将 Data Pipeline 定义从 EMR 3.x 升级到 4.x/5.x?

于 2017-12-21T16:32:13.500 回答