我想运行包含数千行总结果的 BigQuery 查询,但我只想一次检索一页 100 个结果(使用maxResults
andpageToken
参数)。
BigQuery API 支持pageToken
在方法上使用参数collection.list
。但是,我正在运行异步查询并使用该getQueryResult
方法检索结果,它似乎不支持该pageToken
参数。s可以和pageToken
s 一起使用getQueryResults
吗?
我想运行包含数千行总结果的 BigQuery 查询,但我只想一次检索一页 100 个结果(使用maxResults
andpageToken
参数)。
BigQuery API 支持pageToken
在方法上使用参数collection.list
。但是,我正在运行异步查询并使用该getQueryResult
方法检索结果,它似乎不支持该pageToken
参数。s可以和pageToken
s 一起使用getQueryResults
吗?
更新:这里有关于如何翻阅列表结果的新文档。
我正在自我回答这个问题,因为一位开发人员私下问过我这个问题,我想在 Stack Overflow 上分享答案。
当从 Tabledata.list 方法请求分页结果时,可以使用 pageToken 参数。例如,当结果数据超过 100k 行或 10 MB 结果时,结果集会自动分页。您还可以通过显式设置 maxResults 参数来请求结果分页。每页结果都会返回一个 pageToken 参数,然后可以使用该参数来检索下一页结果。
每个查询都会生成一个新的 BigQuery 表。如果您没有明确命名该表,它只会持续 24 小时。然而,即使是未命名的“匿名”表也有一个标识符。无论哪种情况,在插入查询作业后,检索新创建的表的名称。然后使用 tabledata.list 方法(以及 maxResults/pageToken 参数的组合)以分页形式请求结果。使用之前检索到的 pageToken 循环并继续调用 tabledata.list,直到不再返回 pageTokens(意味着您已到达最后一页。
使用适用于 Java 的 Google API 客户端库,用于插入查询作业、轮询查询完成以及检索一页又一页查询结果的代码可能如下所示:
// Create a new BigQuery client authorized via OAuth 2.0 protocol
// See: https://developers.google.com/bigquery/docs/authorization#installed-applications
Bigquery bigquery = createAuthorizedClient();
// Start a Query Job
String querySql = "SELECT TOP(word, 500), COUNT(*) FROM publicdata:samples.shakespeare";
JobReference jobId = startQuery(bigquery, PROJECT_ID, querySql);
// Poll for Query Results, return result output
TableReference completedJob = checkQueryResults(bigquery, PROJECT_ID, jobId);
// Return and display the results of the Query Job
displayQueryResults(bigquery, completedJob);
/**
* Inserts a Query Job for a particular query
*/
public static JobReference startQuery(Bigquery bigquery, String projectId,
String querySql) throws IOException {
System.out.format("\nInserting Query Job: %s\n", querySql);
Job job = new Job();
JobConfiguration config = new JobConfiguration();
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
config.setQuery(queryConfig);
job.setConfiguration(config);
queryConfig.setQuery(querySql);
Insert insert = bigquery.jobs().insert(projectId, job);
insert.setProjectId(projectId);
JobReference jobId = insert.execute().getJobReference();
System.out.format("\nJob ID of Query Job is: %s\n", jobId.getJobId());
return jobId;
}
/**
* Polls the status of a BigQuery job, returns TableReference to results if "DONE"
*/
private static TableReference checkQueryResults(Bigquery bigquery, String projectId, JobReference jobId)
throws IOException, InterruptedException {
// Variables to keep track of total query time
long startTime = System.currentTimeMillis();
long elapsedTime;
while (true) {
Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId()).execute();
elapsedTime = System.currentTimeMillis() - startTime;
System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
jobId.getJobId(), pollJob.getStatus().getState());
if (pollJob.getStatus().getState().equals("DONE")) {
return pollJob.getConfiguration().getQuery().getDestinationTable();
}
// Pause execution for one second before polling job status again, to
// reduce unnecessary calls to the BigQUery API and lower overall
// application bandwidth.
Thread.sleep(1000);
}
}
/**
* Page through the result set
*/
private static void displayQueryResults(Bigquery bigquery,
TableReference completedJob) throws IOException {
long maxResults = 20;
String pageToken = null;
int page = 1;
// Default to not looping
boolean moreResults = false;
do {
TableDataList queryResult = bigquery.tabledata().list(
completedJob.getProjectId(),
completedJob.getDatasetId(),
completedJob.getTableId())
.setMaxResults(maxResults)
.setPageToken(pageToken)
.execute();
List<TableRow> rows = queryResult.getRows();
System.out.print("\nQuery Results, Page #" + page + ":\n------------\n");
for (TableRow row : rows) {
for (TableCell field : row.getF()) {
System.out.printf("%-50s", field.getV());
}
System.out.println();
}
if (queryResult.getPageToken() != null) {
pageToken = queryResult.getPageToken();
moreResults = true;
page++;
} else {
moreResults = false;
}
} while (moreResults);
}