伙计们,以下python脚本以
job state = FAILED
和
Last State Change: Access denied checking streaming input path: s3n://elasticmapreduce/samples/wordcount/input/
代码:
import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time
S3_BUCKET="mytesetbucket123asdf"
conn = boto.connect_emr()
step = StreamingStep(
name='Wordcount',
mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
reducer='aggregate',
input='s3n://elasticmapreduce/samples/wordcount/input/',
output='s3n://' + S3_BUCKET + '/wordcount/output/2013-10-25')
jobid = conn.run_jobflow(
name="test",
log_uri="s3://" + S3_BUCKET + "/logs/",
visible_to_all_users="True",
steps = [step],)
state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
print time.localtime()
time.sleep(10)
state = conn.describe_jobflow(jobid).state
print conn.describe_jobflow(jobid)
print "job state = ", state
print "job id = ", jobid
print "final output can be found in s3://" + S3_BUCKET + "/output" + TIMESTAMP
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."