2

我的用例涉及获取我的项目中存在的所有流式数据流作业的作业 ID 并取消它。更新我的数据流作业的源并重新运行它。

我正在尝试使用 python 来实现这一点。直到现在我还没有遇到任何有用的文档。我想使用 python 的库子进程来执行 gcloud 命令作为一种解决方法。但是我再次无法存储结果并使用它。

有人可以指导我做这件事的最佳方法是什么。

4

2 回答 2

3

您可以像这样直接使用Dataflow rest api

    from google.auth.transport.requests import AuthorizedSession
    import google.auth

    base_url = 'https://dataflow.googleapis.com/v1b3/projects/'

    credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
    project_id = 'PROJECT_ID'
    location = 'europe-west1'
    authed_session = AuthorizedSession(credentials)
    response = authed_session.request('GET', f'{base_url}{project_id}/locations/{location}/jobs')
    print(response.json())

您必须导入 google-auth 依赖项。

您还可以添加查询参数?filter=ACTIVE以仅获取可与您的流作业匹配的活动数据流。

于 2020-07-20T13:33:13.503 回答
3

除了直接使用其余 API 之外,您还可以在google-api-python-client中为 API 使用生成的 Python 绑定。对于简单的调用,它不会增加太多价值,但是当传入许多参数时,它比原始 HTTP 库更容易使用。

使用该库,作业列表调用看起来像

from googleapiclient.discovery import build
import google.auth
credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
df_service = build('dataflow', 'v1b3', credentials=credentials)
response = df_service.projects().locations().jobs().list(
  project_id=project_id,
  location='<region>').execute()
于 2020-07-20T22:54:24.073 回答