0

我一直在使用google_cloud提交作业来big query提取文件,GCS如下所示:

dataset = self.bqClient.dataset(self.website_id)
table = dataset.table(table_name)
job_name = str(uuid.uuid4())
job = self.bqClient.extract_table_to_storage(
job_name, table, destination)
job.destination_format = "NEWLINE_DELIMITED_JSON"
job.compression = 'GZIP'
job.begin()

这里的工作id或者nameuuid4。基本上我正在收集这些工作idsqueue并想稍后检查它job是否处于DONE状态。我怎样才能做到这一点?

我一直在寻找这个,但到目前为止还没有运气。我只能找到该功能 -client.list_jobs()但它是所有jobs. 我只想querysearch为一个特定的job

4

1 回答 1

2

您可以通过name属性过滤掉您想要的工作。

假设您想获取有关id“big name string job 1”的作业的信息。您可以通过运行将其从作业列表中过滤掉:

job_name = "big name string job 1"
job = [job for job in list(self.bqClient.list_jobs()) if job.name == job_name][0] # this will break if list is empty

print(job.state) # DONE or RUNNING

如果可能,请确保更新客户端,目前我们是 ate 版本0.26.0

[编辑]:

您在评论中说拥有 100 万个工作岗位。就运行get job方法而言,目前 API 仅在运行existsreload方法时才会这样做,因此无法运行类似client.get_job(job_name).

不过,在 中list_job,您可以发送参数all_usersstate_filter就像在代码中一样:

job_name = "big name string job 1"
job = [job for job in list(self.bqClient.list_jobs(all_users=False, state_filter='done')) if job.name == job_name][0]

其中仅列出具有给定状态的客户端中授权的当前用户的作业。

如果仍然列出数百万,那么您仍然可以(有点“hacky”解决方案)直接从作业基础构造函数查询它,例如:

from google.cloud.bigquery.job import _AsyncJob
job = _AsyncJob(job_name, self.bqClient)
job.reload()
print(job.state) #RUNNING or DONE

这是运行client.get_job().

这可能是对 python 存储库的一个有趣的功能请求。

于 2017-08-19T17:03:35.653 回答