6

是否可以将选项从 azkaban 工作流程传递到底层工作代码?

我有这样的东西,它适用于硬编码/预先知道的日期,但我希望可以选择在执行流程时指定日期:

from azkaban import Job, Project
import datetime
import os
from datetime import datetime, timezone, timedelta




options = {
            'start.date' : today.strftime('%Y-%m-%d'), # Can we pass this as an argument to the underlying code?
            'day.offset' : 1
            }

project = Project('my_project',root=__file__)
project.add_file('my_shell_script.sh', 'my_shell_script.sh')
project.add_job('my_job', Job(options, {'type' : 'command' : 'bash my_shell_script <pass date here?>'}))
project.add_job('my_job', Job(options, {'type' : 'command' : 'java -jar test.jar <pass date here?>'}))

谢谢,沙拉特

4

3 回答 3

5

大图:通过将参数写入磁盘使参数持久化

在 Azkaban 流中的不相邻作业之间传递参数的一种方法是在需要参数之前对 JOB_OUTPUT_PROP_FILE 进行操作。必须使用 shell 脚本执行此操作,因为 JOB_OUTPUT_PROP_FILE 变量不能直接用于给定作业。 这种方法将相关信息写入文件,并在需要使用帮助脚本之前读取它。参数可以通过在每一步写入 JOB_OUTPUT_PROP_FILE 来传递给相邻的作业。

作业之间传递参数图


概念

  1. 将包含您希望传递的信息的文件写入磁盘
  2. 创建一个在需要参数的作业之前运行的助手/准备 shell 脚本
  3. 在你的工作中使用参数

例子

在流中第一个作业运行的日期需要被后一个作业使用的场景中,首先将相关数据写入文件。在此示例中,将 YYYY-MM-DD 格式的当前日期写入名为 rundate.text 的本地文件

#step_1.job
type=command
dependencies=initialize_jobs
command=whoami
command.1=/bin/sh -c "date '+%Y-%m-%d' > rundate.text"

然后,就在需要参数之前,运行准备脚本以使参数可用。

#step_4_preparation.job
type=command
dependencies=step_3
command=whoami
command.1=/bin/bash rd.sh

第4步准备执行以下shell脚本(rd.sh)

#!/bin/sh
# this script takes the run_date value from the text file and passes it to Azkaban
# Now, the variable can be used in the next step of the job

RD=$(cat rundate.text)

echo "Now setting Job Output Property File with RD (run date) variable"
echo $RD


#This is the point where the parameter is written in JSON format
#to the JOB_OUTPUT_PROP_FILE, which allows it to be used in the next step
echo '{"RD" : "'"$RD"'"}' > "${JOB_OUTPUT_PROP_FILE}"

然后,在接下来的步骤中,可以使用该参数,在本例中为 ${RD}。

# step_4.job
type=command
dependencies=step_4_preparation
command=whoami
command.1=bash -c "echo ${RD} is the run date"
于 2017-07-19T20:31:19.123 回答
2

出色地,

根据 azkaban doc,可以覆盖唯一的全局流属性。在python中,我们可以这样设置全局属性:

    project = Project('emr-cluster-creation', root=__file__)

project.properties = {
                                                    'emr.cluster.name' : 'default-clustername',
                                                    'emr.master.instance.type' : 'r3.xlarge',
                                                    'emr.core.instance.type' : 'r3.xlarge',
                                                    'emr.task.instance.type' : 'r3.xlarge',
                                                    'emr.instance.count' : 11,
                                                    'emr.task.instance.count' : 5,
                                                    'emr.hadoop.conf.local.path' : 's3n://bucket/hadoop-configuration.json',
                                                    'emr.hive.site.s3.path' : 's3n://bucket/hive-site.xml',
                                                    'emr.spark.version' : '1.4.0.b',
#                                                   'emr.service.role' : 'some-role', #amazon IAM role.
                                                    'emr.service.role' : '\'\'', #amazon IAM role.
                                                    'emr.cluster.output' : '\'\''
    }

# do something...

这些参数可以作为 ${emr.cluster.name} 传递给底层应用程序/脚本。这将支持传递默认属性值和覆盖 azkaban 服务器 web-ui 上的流参数或使用 azkaban ajax API。

于 2015-07-12T19:29:11.440 回答
1

正如eeasterly所说,正确的方法是使用JOB_OUTPUT_PROP_FILE但不是将其持久化到文件系统,我相信最好使用它传递给所有依赖项的特征(创建流程>作业配置>参数输出>“属性可以被导出以传递给它的依赖项”)。

要利用此功能,只需使需要导出参数的作业依赖于导出它们的作业。在 eeasterly 的情况下,丢弃中间体step 4 preparation,只使step 4依赖于step 1.

于 2018-08-24T11:40:43.620 回答