有没有办法从基于 CDAP 事件的触发器运行 Google 数据融合管道?
第一个要求是,每当新文件到达 GCS 存储桶时。它将触发数据融合管道自动运行。
第二个要求是管道依赖,例如管道 A 未启动或失败,管道 B 无法运行。
谢谢
有没有办法从基于 CDAP 事件的触发器运行 Google 数据融合管道?
第一个要求是,每当新文件到达 GCS 存储桶时。它将触发数据融合管道自动运行。
第二个要求是管道依赖,例如管道 A 未启动或失败,管道 B 无法运行。
谢谢
回顾您的初始用例,我假设对于第二个需求,您可能会考虑查看 CDAP 纯组件,例如:Schedules、Workflows和Triggers。
通常,为具有某些条件执行模式的底层管道设计运行流程,您可以通过定义特定的Workflow来创建Schedule对象,该 Workflow包含管道之间条件的逻辑组合,并应用与您的事件发生匹配的触发器模型。
根据 CDAP 文档:
工作流可以由CDAP CLI和Lifecycle HTTP RESTful API 控制。
如上所述,需要根据文档中的示例和我创建的工作流的进一步参考,向 CDAP REST API 编写适当的 HTTP 请求,其中包含存储要创建的计划的详细信息的 JSON 对象,而仅在成功Pipeline_2
时触发:Pipeline_1
{
"name": "Schedule_1",
"description": "Triggers Pipeline_2 on the succeding execution of Pipeline_1",
"namespace": "<Pipeline_2-namespace>",
"application": "Pipeline_2",
"version": "<application version of the Pipeline_2>",
"program": {
"programName": "Workflow_name",
"programType": "WORKFLOW"
},
"trigger": {
"type": "PROGRAM_STATUS",
"programId": {
"namespace": "<Pipeline_1-namespace>",
"application": "Pipeline_1",
"version": "<application version of the Pipeline_1>",
"type": "WORKFLOW",
"entity": "PROGRAM",
"program": "Workflow_name"
},
"programStatuses": ["COMPLETED"]
}
}
对于第一个要求,我不确定在 Data Fusion/CDAP 本机仪器中实现是否可行,而我无法看到此类事件,与 GCS 存储桶的持续发现相匹配:
触发器由事件触发,例如在数据集中创建新分区、执行时间触发器的 cron 表达式或程序状态。
在这种情况下,我会查看 GCP Cloud 函数和 GCP Composer,写得很好的示例,描述了如何将 Cloud Functions 用于基于事件的 DAG 触发器的方式,假设特别是 Composer DAG 文件,您可以调用顺序数据融合管道执行。查看此 Stack线程以获取更多详细信息。