1

我有一个带有 distCp 操作的工作流,它运行得相当好。但是,现在我正在尝试更改复制策略,并且无法通过操作参数来做到这一点。关于这个主题的文档相当少,查看 distCp 操作执行器的源代码并没有帮助。

如果从命令行运行 distCp,我可以使用命令行参数 -strategy {uniformsize|dynamic}来设置复制策略。

使用该逻辑,我尝试在 oozie 操作中执行此操作。

<action name="distcp-run" retry-max="3" retry-interval="1">
    <distcp xmlns="uri:oozie:distcp-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapreduce.job.queuename</name>
                <value>${poolName}</value>
            </property>
        </configuration>
        <arg>-Dmapreduce.job.queuename=${poolName}</arg>
        <arg>-Dmapreduce.job.name=distcp-s3-${wf:id()}</arg>
        <arg>-update</arg>
        <arg>-strategy dynamic</arg>
        <arg>${region}/d=${day2HoursAgo}/h=${hour2HoursAgo}</arg>
        <arg>${region2}/d=${day2HoursAgo}/h=${hour2HoursAgo}</arg>
        <arg>${region3}/d=${day2HoursAgo}/h=${hour2HoursAgo}</arg>
        <arg>${nameNode}${rawPath}/${partitionDate}</arg>
    </distcp>
    <ok to="join-distcp-steps"/>
    <error to="error-report"/>
</action>

但是,当我执行时操作失败。

从标准输出:

...>>> Invoking Main class now >>>

Fetching child yarn jobs
tag id : oozie-1d1fa70383587ae625b6495e30a315f7
Child yarn jobs are found - 
Main class        : org.apache.hadoop.tools.DistCp
Arguments         :
                    -Dmapreduce.job.queuename=merged
                    -Dmapreduce.job.name=distcp-s3-0000019-160622133128476-oozie-oozi-W
                    -update
                    -strategy dynamic
                    s3a://myfirstregion/d=21/h=17,s3a://mysecondregion/d=21/h=17,s3a://ttv-logs-eu/tsv/clickstream-clean/y=2016/m=06/d=21/h=17,s3a://mythirdregion/d=21/h=17
                    hdfs://myurl:8020/data/raw/2016062117
found Distcp v2 Constructor
                    public org.apache.hadoop.tools.DistCp(org.apache.hadoop.conf.Configuration,org.apache.hadoop.tools.DistCpOptions) throws java.lang.Exception

<<< Invocation of Main class completed <<<

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, Returned value from distcp is non-zero (-1)
java.lang.RuntimeException: Returned value from distcp is non-zero (-1)
    at org.apache.oozie.action.hadoop.DistcpMain.run(DistcpMain.java:66)...

查看系统日志,它似乎抓住了 -strategy 动态并试图将其放入源路径数组中:

2016-06-22 14:11:18,617 INFO [uber-SubtaskRunner] org.apache.hadoop.tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=true, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[-strategy dynamic, s3a://myfirstregion/d=21/h=17,s3a:/mysecondregion/d=21/h=17,s3a:/ttv-logs-eu/tsv/clickstream-clean/y=2016/m=06/d=21/h=17,s3a:/mythirdregion/d=21/h=17], targetPath=hdfs://myurl:8020/data/raw/2016062117, targetPathExists=true, preserveRawXattrs=false, filtersFile='null'}
2016-06-22 14:11:18,624 INFO [uber-SubtaskRunner] org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at sandbox/10.191.5.128:8032
2016-06-22 14:11:18,655 ERROR [uber-SubtaskRunner] org.apache.hadoop.tools.DistCp: Invalid input: 
org.apache.hadoop.tools.CopyListing$InvalidInputException: -strategy dynamic doesn't exist

所以从 DistCpOptions 有一个 copyStrategy 但它被设置为默认的 uniformsize 值。我首先尝试移动参数,但是两个 -Dmapreduce 参数最终都出现在源路径中(但 -update 没有)。

如何通过 Oozie 工作流配置将复制策略设置为动态?

谢谢。

4

1 回答 1

1

查看代码,似乎无法通过配置来设置策略。distcp-action您可以使用操作而不是使用map-reduce操作,这样您就可以根据需要对其进行配置。

Oozie MapReduce Cookbook有示例。

查看Distcp代码,相关部分在第 237 行附近createJob()

Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS, String.valueOf(inputOptions.getMaxMaps()));

上面的代码并不是您需要的全部,您需要查看 distcp 源代码才能将它们全部解决。

因此,您需要自己在map-reduce操作中配置所有属性。这样您就可以设置使用InputFormatClassstrategy设置的位置。

您可以在此处查看InputFormatClassdistcp 属性文件中的可用属性。

您需要的是org.apache.hadoop.tools.mapred.lib.DynamicInputFormat.

于 2016-06-23T13:14:26.000 回答