1

有没有办法从基于 CDAP 事件的触发器运行 Google 数据融合管道?

第一个要求是,每当新文件到达 GCS 存储桶时。它将触发数据融合管道自动运行。

第二个要求是管道依赖,例如管道 A 未启动或失败,管道 B 无法运行。

谢谢

4

1 回答 1

2

回顾您的初始用例,我假设对于第二个需求,您可能会考虑查看 CDAP 纯组件,例如:SchedulesWorkflowsTriggers

通常,为具有某些条件执行模式的底层管道设计运行流程,您可以通过定义特定的Workflow来创建Schedule对象,该 Workflow包含管道之间条件的逻辑组合,并应用与您的事件发生匹配的触发器模型。

根据 CDAP 文档:

工作流可以由CDAP CLILifecycle HTTP RESTful API 控制。

如上所述,需要根据文档中的示例和我创建的工作流的进一步参考,向 CDAP REST API 编写适当的 HTTP 请求,其中包含存储要创建的计划的详细信息的 JSON 对象,而仅在成功Pipeline_2时触发:Pipeline_1

{
  "name": "Schedule_1",
  "description": "Triggers Pipeline_2 on the succeding execution of Pipeline_1",
  "namespace": "<Pipeline_2-namespace>",
  "application": "Pipeline_2",
  "version": "<application version of the Pipeline_2>",
  "program": {
    "programName": "Workflow_name",
    "programType": "WORKFLOW"
  },
  "trigger": {
        "type": "PROGRAM_STATUS",
        "programId": {
            "namespace": "<Pipeline_1-namespace>",
            "application": "Pipeline_1",
            "version": "<application version of the Pipeline_1>",
            "type": "WORKFLOW",
            "entity": "PROGRAM",
            "program": "Workflow_name"
        },
        "programStatuses":  ["COMPLETED"]
  }
}

对于第一个要求,我不确定在 Data Fusion/CDAP 本机仪器中实现是否可行,而我无法看到此类事件,与 GCS 存储桶的持续发现相匹配:

触发器由事件触发,例如在数据集中创建新分区、执行时间触发器的 cron 表达式或程序状态。

在这种情况下,我会查看 GCP Cloud 函数和 GCP Composer,写得很好的示例,描述了如何将 Cloud Functions 用于基于事件的 DAG 触发器的方式,假设特别是 Composer DAG 文件,您可以调用顺序数据融合管道执行。查看此 Stack线程以获取更多详细信息。

于 2020-09-09T10:16:19.730 回答