我的用例涉及获取我的项目中存在的所有流式数据流作业的作业 ID 并取消它。更新我的数据流作业的源并重新运行它。
我正在尝试使用 python 来实现这一点。直到现在我还没有遇到任何有用的文档。我想使用 python 的库子进程来执行 gcloud 命令作为一种解决方法。但是我再次无法存储结果并使用它。
有人可以指导我做这件事的最佳方法是什么。
我的用例涉及获取我的项目中存在的所有流式数据流作业的作业 ID 并取消它。更新我的数据流作业的源并重新运行它。
我正在尝试使用 python 来实现这一点。直到现在我还没有遇到任何有用的文档。我想使用 python 的库子进程来执行 gcloud 命令作为一种解决方法。但是我再次无法存储结果并使用它。
有人可以指导我做这件事的最佳方法是什么。
您可以像这样直接使用Dataflow rest api
from google.auth.transport.requests import AuthorizedSession
import google.auth
base_url = 'https://dataflow.googleapis.com/v1b3/projects/'
credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
project_id = 'PROJECT_ID'
location = 'europe-west1'
authed_session = AuthorizedSession(credentials)
response = authed_session.request('GET', f'{base_url}{project_id}/locations/{location}/jobs')
print(response.json())
您必须导入 google-auth 依赖项。
您还可以添加查询参数?filter=ACTIVE以仅获取可与您的流作业匹配的活动数据流。
除了直接使用其余 API 之外,您还可以在google-api-python-client中为 API 使用生成的 Python 绑定。对于简单的调用,它不会增加太多价值,但是当传入许多参数时,它比原始 HTTP 库更容易使用。
使用该库,作业列表调用看起来像
from googleapiclient.discovery import build
import google.auth
credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
df_service = build('dataflow', 'v1b3', credentials=credentials)
response = df_service.projects().locations().jobs().list(
project_id=project_id,
location='<region>').execute()