我们目前有一个 Python Apache Beam 管道正在工作并且能够在本地运行。我们现在正在让管道在 Google Cloud Dataflow 上运行并完全自动化,但在 Dataflow/Apache Beam 的管道监控中发现了一个限制。
目前,Cloud Dataflow有两种监控管道状态的方法,一种是通过其 UI 界面,另一种是通过命令行中的 gcloud。这两种解决方案都不适用于完全自动化的解决方案,我们可以考虑无损文件处理。
查看 Apache Beam 的 github,他们有一个文件internal/apiclient.py,其中显示有一个用于获取作业状态的函数get_job。
我们发现 get_job 使用的一个实例在runners/dataflow_runner.py中。
最终目标是使用此 API 获取我们自动触发运行的一个或多个作业的状态,以确保它们最终都通过管道成功处理。
任何人都可以向我们解释在我们运行管道(p.run()
)之后如何使用这个 API 吗?我们不明白runner
inresponse = runner.dataflow_client.get_job(job_id)
从哪里来。
如果有人可以更深入地了解我们如何在设置/运行我们的管道时访问此 API 调用,那就太好了!