0

我有一个在 Google App Engine 上运行的 Java 应用程序,其 SDK 版本为 1.9.36。该应用程序运行 Java 7,可以访问 Datastore、BigQuery 和 Cloud Storage。该容器是一个 B8 类后端服务器,任务由接收 servlet 提交,该 servlet 进行一些基本检查,然后提交一个 TaskQueue 条目。

我面临的问题是应用程序只是停止响应。使用 JobQueue 从 BigQuery 表中读取数据。在检查作业状态从 RUNNING 变为 DONE 时,应用程序只是停止记录和处理。此等待迭代次数以及应用程序停止的位置会有所不同。

它不一致,因此无法将其固定到特定的代码段。

有些日子,应用程序运行几天都没有问题,有一天我什至不能不停地完成一次迭代。

表中的数据在 600 到 6000 行之间变化。我一次读取 2000 行。

有时它会到达“工作完成 - 让处理结果”部分,然后简单地停止。有时它会记录几条 RUNNING 消息然后停止。

这是我设置作业参数然后开始作业的部分。我在 do{}while 循环中省略了代码。那只是处理位,但工作正常。问题似乎在于在某处检索数据。

public List<String> retrieveMergedTableTxIds(Date runDate) throws ProcessorCustomException {

    List<String> existingIds = new ArrayList<>();

    BigQueryQryStringBuilder queryStringBuilder = new BigQueryQryStringBuilderImpl();

    String tempTableName = queryStringBuilder.buildTempTableName();

    String qryString;

    try {
        qryString = queryStringBuilder.buildMergeTableQuery(ApplicationConstants.projectName, ApplicationConstants.DATASET_ID,
                ApplicationConstants.TRXID_VIEW, runDate);

        logger.info("Query string is |" + qryString);

        JobQueryParameters jobQueryParameters = new JobQueryParameters();
        jobQueryParameters.setBigquery(bigquery);
        jobQueryParameters.setProjectId(ApplicationConstants.projectName);
        jobQueryParameters.setDatasetId(ApplicationConstants.DATASET_ID);
        jobQueryParameters.setQuerySql(qryString);
        jobQueryParameters.setTempTableName(tempTableName);


        JobReference jobId = startQuery(jobQueryParameters);

        logger.fine("JobID for submitted job is " + jobId.getJobId());
        logger.fine("Polling job for DONE status");

        TableReference completedJob = pollJobStatus(jobQueryParameters.getBigquery(), ApplicationConstants.projectName, jobId);

        logger.fine("Job done - let process results!");

        Job job = jobQueryParameters.getBigquery().jobs().get(ApplicationConstants.projectName, jobId.getJobId()).execute();

        logger.fine("JobID is " + job.getId());

        logger.fine("JobID is " + job.getId());

        GetQueryResultsResponse response = jobQueryParameters.getBigquery().jobs()
                .getQueryResults(ApplicationConstants.projectName, job.getJobReference().getJobId()).execute();

        logger.fine("Response total rows is " + response.getTotalRows());

        // Default to not looping
        boolean moreResults = false;
        String pageToken = null;

        do {
            logger.fine("Insize the per-token do-while loop");

            TableDataList queryResult = jobQueryParameters.getBigquery().tabledata()
                    .list(completedJob.getProjectId(), completedJob.getDatasetId(), completedJob.getTableId())
                    .setMaxResults(ApplicationConstants.MAX_RESULTS).setPageToken(pageToken).execute();

            logger.info("Value for isEmpty is " + queryResult.isEmpty());

            if (!queryResult.isEmpty() && queryResult != null) {
                logger.fine("Row size for token is " + queryResult.size());
            }


        } while (moreResults);
public TableReference pollJobStatus(Bigquery bigquery, String projectId, JobReference jobId) throws IOException,
        InterruptedException {

    while (true) {
        Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId()).execute();

        logger.info("Job status for JobId " + pollJob.getId() + " is " + pollJob.getStatus().getState());

        if (pollJob.getStatus().getState().equals("DONE")) {
            logger.info("Returning the TableReference in pollJobStatus");
            return pollJob.getConfiguration().getQuery().getDestinationTable();
        }
        // Pause execution for one second before polling job status again,
        // to
        // reduce unnecessary calls to the BigQUery API and lower overall
        // application bandwidth.
        Thread.sleep(2000);
    }
}

public JobReference startQuery(JobQueryParameters jobQueryParameters) throws IOException {

    Job job = new Job();
    JobConfiguration config = new JobConfiguration();
    JobConfigurationQuery queryConfig = new JobConfigurationQuery();

    queryConfig.setAllowLargeResults(true);

    TableReference reference = new TableReference();
    reference.setProjectId(jobQueryParameters.getProjectId());
    reference.setDatasetId(jobQueryParameters.getDatasetId());
    reference.setTableId(jobQueryParameters.getTempTableName());

    Table table = new Table();
    table.setId(jobQueryParameters.getTempTableName());
    table.setExpirationTime(Calendar.getInstance().getTimeInMillis() + 360000L);
    table.setTableReference(reference);

    jobQueryParameters.getBigquery().tables()
            .insert(jobQueryParameters.getProjectId(), jobQueryParameters.getDatasetId(), table).execute();

    queryConfig.setDestinationTable(reference);

    config.setQuery(queryConfig);

    job.setConfiguration(config);

    queryConfig.setQuery(jobQueryParameters.getQuerySql());

    Insert insert = jobQueryParameters.getBigquery().jobs().insert(jobQueryParameters.getProjectId(), job);
    insert.setProjectId(jobQueryParameters.getProjectId());

    JobReference jobId = insert.execute().getJobReference();

    return jobId;
}

查看 AppEngine 控制台时,它仍然显示实例正在运行,但显示已处理请求的图表也停止了。

任何有类似经历的人在没有代码更改或重新部署的情况下行为如此反复无常?

4

0 回答 0