0

我的行动

start_fair_usage以状态正常结束,但test_copy返回

Main class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, null

/user/comverse/data/${1}_B我有很多不同的文件,其中一些我想复制到${NAME_NODE}/user/evkuzmin/output. 为此,我尝试从中传递paths包含一组copy_files.sh路径到我需要的文件的路径。

  <action name="start_fair_usage">
    <shell xmlns="uri:oozie:shell-action:0.1">
      <job-tracker>${JOB_TRACKER}</job-tracker>
      <name-node>${NAME_NODE}</name-node>
      <exec>${copy_file}</exec>      
      <argument>${today_without_dash}</argument>
      <argument>${mta}</argument>
      <!-- <file>${path}#${start_fair_usage}</file> -->
      <file>${path}${copy_file}#${copy_file}</file>
      <capture-output/>
    </shell>
    <ok to="test_copy"/>
    <error to="KILL"/>
  </action>

  <action name="test_copy">
    <distcp xmlns="uri:oozie:distcp-action:0.2">
      <job-tracker>${JOB_TRACKER}</job-tracker>
      <name-node>${NAME_NODE}</name-node>
      <arg>${wf:actionData('start_fair_usage')['paths']}</arg>
      <!-- <arg>${NAME_NODE}/user/evkuzmin/input/*</arg> -->
      <arg>${NAME_NODE}/user/evkuzmin/output</arg>
    </distcp>
    <ok to="END"/>
    <error to="KILL"/>
  </action>

start_fair_usage开始copy_file.sh

echo ${1} 
echo ${2}

dirs=(
    /user/comverse/data/${1}_B
    )
args=()

for i in $(hadoop fs -ls "${dirs[@]}" | egrep ${2}.gz | awk -F " " '{print $8}')
do
    args+=("$i")
    echo "copy file - "${i}
done

paths=${args}
echo ${paths}
4

1 回答 1

0

这是我最后所做的。

  <start to="start_copy"/>

  <fork name="start_copy">
    <path start="copy_mta"/>
    <path start="copy_rcr"/>
    <path start="copy_sub"/>
  </fork>

  <action name="copy_mta">
    <distcp xmlns="uri:oozie:distcp-action:0.2">
      <prepare>
        <delete path="${NAME_NODE}${dstFolder}mta/*"/>
      </prepare>
      <arg>${NAME_NODE}${srcFolder}/*mta.gz</arg>
      <arg>${NAME_NODE}${dstFolder}mta/</arg>
    </distcp>
    <ok to="end_copy"/>
    <error to="KILL"/>
  </action>

  <action name="copy_rcr">
    <distcp xmlns="uri:oozie:distcp-action:0.2">
      <prepare>
        <delete path="${NAME_NODE}${dstFolder}rcr/*"/>
      </prepare>
      <arg>${NAME_NODE}${srcFolder}/*rcr.gz</arg>
      <arg>${NAME_NODE}${dstFolder}rcr/</arg>
    </distcp>
    <ok to="end_copy"/>
    <error to="KILL"/>
  </action>

  <action name="copy_sub">
    <distcp xmlns="uri:oozie:distcp-action:0.2">
      <prepare>
        <delete path="${NAME_NODE}${dstFolder}sub/*"/>
      </prepare>
      <arg>${NAME_NODE}${srcFolder}/*sub.gz</arg>
      <arg>${NAME_NODE}${dstFolder}sub/</arg>
    </distcp>
    <ok to="end_copy"/>
    <error to="KILL"/>
  </action>

  <join name="end_copy" to="END"/>

  <kill name="KILL">
    <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  </kill>
  <end name="END"/>

结果证明可以在 distcp 中使用通配符,所以我根本不需要 bash。

还。有人建议我用 scala 编写它。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}

val conf = new Configuration()
val fs = FileSystem.get(conf)

val listOfFileTypes = List("mta", "rcr", "sub")
val listOfPlatforms = List("B", "C", "H", "M", "Y")

for(fileType <- listOfFileTypes){
  FileUtil.fullyDeleteContents(new File("/apps/hive/warehouse/arstel.db/fair_usage/fct_evkuzmin/file_" + fileType))
  for (platform <- listOfPlatforms) {
    var srcPaths = fs.globStatus(new Path("/user/comverse/data/" + "20170404" + "_" + platform + "/*" + fileType + ".gz"))
    var dstPath = new Path("/apps/hive/warehouse/arstel.db/fair_usage/fct_evkuzmin/file_" + fileType)

    for(srcPath <- srcPaths){
      println("copying " + srcPath.getPath.toString)
      FileUtil.copy(fs, srcPath.getPath, fs, dstPath, false, conf)
    }
  }
}

这两件事都有效,以为我没有尝试在 Oozie 中运行 scala 脚本。

于 2017-04-09T14:35:07.427 回答