2

我正在尝试将 Elastic Data Pipeline 与 shell 命令活动一起使用来处理一些数据文件。具体来说,我正在尝试使用 python 来处理一些分阶段的数据。

我对 python 脚本的第一次尝试看起来像:

#!/usr/bin/env python
import os

print "We've entered the python file"
print os.curdir
print os.listdir(os.curdir)
print ${INPUT1_STAGING_DIR}

由于 ${INPUT1_STAGING_DIR} 未定义,此代码在命中第四个打印语句时引发错误。

之后,我尝试将 ${INPUT1_STAGING_DIR} 作为参数传递给脚本,并将脚本修改为:

#!/usr/bin/env python
import os
import sys

print "We've entered the python file"
print os.curdir
print os.listdir(os.curdir)
print sys.argv

这次脚本成功完成并具有以下输出。

我们已经输入了python文件
.
['taskRunner.zip'、'mysql-connector-java-bin.jar'、'csv-serde.jar'、'run.sh'、'TaskRunner-1.0.jar'、'pipeline-serde.jar'、' run.out', '输出']
['/mnt/taskRunner/output/tmp/ActivityIdQC6BK20140722T170548Attempt1_command.sh', '${INPUT1_STAGING_DIR}']

同样, ${INPUT1_STAGING_DIR} 没有“替代”任何类似于我正在暂存的文件的东西。我正在尝试做的事情可能吗?如果没有,有没有好的解决方法?

4

2 回答 2

4

事实证明 ${INPUT1_STAGING_DIR} 被设置为环境变量,可以这样访问。以下脚本适用于我:

#!/usr/bin/python

import os
import sys

print "We've entered the python file"

print os.curdir
print os.listdir(os.curdir)

inputDir = os.getenv('INPUT1_STAGING_DIR')
print inputDir
print os.path.isfile(inputDir)
print os.path.isdir(inputDir)
print os.listdir(inputDir)
于 2014-07-23T17:23:00.483 回答
1

您必须设置 Stage = true。此选项在 ShellCommandActivity 活动中作为下拉字段提供。

具体引用文档: http ://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-concepts-staging.html

在资源上本地暂存数据 输入数据自动复制到资源本地文件系统中。输出数据自动从资源本地文件系统复制到输出数据节点。例如,当您使用 staging = true 配置 ShellCommandActivity 输入和输出时,输入数据以 INPUTx_STAGING_DIR 的形式提供,输出数据以 OUTPUTx_STAGING_DIR 的形式提供,其中 x 是输入或输出的数量。

希望这可以解决您的原始问题。

于 2015-04-06T13:44:47.177 回答