1

我有一份工作 A 需要调用 Azkaban 流“F”作为依赖项。如何提及作业 A 对流 F 的依赖性?

这是我现在与获取远程存储流“F”相关的内容:

session = remote.Session("user@https://AZKABANURL")
workflows = session.get_workflows("FlowFProjectName")
flows = workflows[u"flows"]
flow_id = flows[0]["flowId"]

workflows = session.get_workflow_info("FlowFProjectName", flow_id)
node_id = workflows["nodes"][0]["id"]

现在我有了 node_id ,它是流 F 中最后一个作业的名称,如何在作业 A 中添加流 F 的依赖项?是这样吗?

jobs["A"] = {
    "type": "command", 
    "command": 'echo "Hello World"', 
    "dependencies": "F"
}

执行以下操作会使我在上传到 Azkaban 时出错(通过将此作业 A 捆绑到项目中):

jobs["a"] = Job({"type": "command", "command": 'echo "Hello World"',"dependencies": node_id})

这是错误:

azkaban.util.AzkabanError: Installation Failed.
Error found in upload. Cannot upload.
a cannot find dependency <node_id>

在这里,node_id 是我隐藏的作业的实际名称。

有人可以建议我在工作中添加这些对外部流程的依赖吗?外部流程在 Azkaban 上(这就是我必须使用 Azkaban.remote 的原因)。

4

1 回答 1

0

我找到了我的问题的答案:

  1. 调用远程流并等待它完成(在一个while循环中)
  2. 使用让流程 F 将其称为相关工作的启动。

选项 1:哪个更容易理解 - 您使用 while 循环不断询问 Azkaban 特定作业/流程是否仍在运行。但是在这样做时,您应该让您的 while 循环运行数小时和数小时 + 检查流程是否正在运行的方式是使用 get_running_workflows() 方法。此方法不返回流的某个实例是否仍在运行,而仅返回所述流的任何实例是否正在运行。

选项 2:如果流程 F 以作业 f 结束,并且在流程 F 执行完成后需要运行作业 A,则将作业添加到流程 F 的末尾,例如 f',以便 f' 将调用作业 A。

如果这很难理解:

原始作业图: Flow F,作业 A 所依赖的流:f1 -> f2 -> ... f

添加启动作业后: Flow f': f1 -> f2 -> ... f -> f'

在这里,f' 应包括一个session.run_workflow(project_A, flow_A)

这是比选项 1 更好的方法,因为您确定只有在 Flow f 成功执行完成后才会启动作业 A。我希望这对将来的某人有所帮助。

于 2018-08-30T04:59:42.503 回答