我正在尝试在我的 Spring 批处理作业中生成一个序列文件以传递给 Hadoop map/reduce。通过手动将文件复制到 hdfs 上,我设法让这项工作开始工作。当它在我的本地系统测试中运行时,它运行良好,因为本地文件系统找到了该文件。但是当我尝试将它部署到远程 Hadoop 实例时,我得到了以下异常。
org.apache.hadoop.mapreduce.lib.input.InvalidInputException:输入路径不存在:hdfs://ngram-test:9000/user/hduser/DocumentsPTOgrants2007_2011.seq 在 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224) 在 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55) 在 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:241) 在 org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885) 在 org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779) 在 org.apache.hadoop.mapreduce.Job.submit(Job.java:432) 在 org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447) 在 com.atsid.hadoop.jobs.AbstractJobRunner.executeJob(AbstractJobRunner.java:70) 在 com.atsid.hadoop.jobs.AbstractJobRunner.run(AbstractJobRunner.java:36) 在 org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) 在 org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) 在 com.atsid.cloudbase.ngram.ingest.mapreduce.NGramIngestJobRunner.main(NGramIngestJobRunner.java:34) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 在 java.lang.reflect.Method.invoke(Method.java:597) 在 org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:191) 在 org.springframework.data.hadoop.mapreduce.JarExecutor.invokeTargetObject(JarExecutor.java:71) 在 org.springframework.data.hadoop.mapreduce.HadoopCodeExecutor.invokeTarget(HadoopCodeExecutor.java:185) 在 org.springframework.data.hadoop.mapreduce.HadoopCodeExecutor.runCode(HadoopCodeExecutor.java:102) 在 org.springframework.data.hadoop.mapreduce.JarTasklet.execute(JarTasklet.java:32) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 在 java.lang.reflect.Method.invoke(Method.java:597) 在 org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) 在 org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132) 在 org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) 在 org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) 在 com.sun.proxy.$Proxy43.execute(未知来源) 在 org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386) 在 org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:131) 在 org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264) 在 org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76) 在 org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367) 在 org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214) 在 org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143) 在 org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250) 在 org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195) 在 org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135) 在 org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61) 在 org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60) 在 org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144) 在 org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124) 在 org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135) 在 org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:293) 在 org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120) 在 org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49) 在 org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114) 在 org.springframework.batch.core.launch.support.CommandLineJobRunner.start(CommandLineJobRunner.java:349) 在 org.springframework.batch.core.launch.support.CommandLineJobRunner.main(CommandLineJobRunner.java:574)
这是该步骤使用的 tasklet 配置。我正在尝试使用该files
属性将输入文件传递给 HDFS。该文件出现在 Hadoop 日志中。
<hdp:jar-tasklet id="ingestJarTasklet" scope="step"
jar="file:${ingest.job.jar.path}"
main-class="com.atsid.cloudbase.ngram.ingest.mapreduce.NGramIngestJobRunner"
libs="${ingest.job.libs}"
files="#{seqFileLocation.URI.toString()}"
configuration-ref="hadoopConfiguration">
ngram.jobrunner.input.document.sequence.file=${job.file.location}#{seqFileLocation.filename}
</hdp:jar-tasklet>