1

我正在开发一个使用 Bigquery 作为分析引擎的 Java 应用程序。能够使用Insert a Query Job上的代码运行查询作业(并获得结果) 。必须使用stackoverflow 上的此评论修改代码以使用服务帐户。

现在,需要运行提取作业以将表导出到 GoogleStorage 上的存储桶。基于Exporting a Table,能够修改 Java 代码以插入提取作业(代码如下)。运行时,提取作业的状态从 PENDING 变为 RUNNING 再到 DONE。问题是实际上没有文件上传到指定的存储桶。

可能有用的信息:

  • createAuthorizedClient函数返回一个 Bigquery 实例并适用于查询作业,因此服务帐户、私钥等可能没有问题。
  • 还尝试在谷歌的 api-explorer上手动创建和运行插入作业,并且文件在存储桶中成功创建。对项目、数据集、表和目标 uri 使用与代码中相同的值,因此这些值应该是正确的。

这是代码(粘贴完整的文件以防其他人发现这很有用):

import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.List;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;

public class BigQueryJavaGettingStarted {
    
    private static final String PROJECT_ID = "123456789012";
    private static final String DATASET_ID = "MY_DATASET_NAME";
    private static final String TABLE_TO_EXPORT = "MY_TABLE_NAME";
    private static final String SERVICE_ACCOUNT_ID = "123456789012-...@developer.gserviceaccount.com";
    private static final File PRIVATE_KEY_FILE = new File("/path/to/privatekey.p12");
    private static final String DESTINATION_URI = "gs://mybucket/file.csv";

    private static final List<String> SCOPES =  Arrays.asList(BigqueryScopes.BIGQUERY);
    private static final HttpTransport TRANSPORT = new NetHttpTransport();
    private static final JsonFactory JSON_FACTORY = new JacksonFactory();
    
    public static void main (String[] args) {
        try {
            executeExtractJob();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static final void executeExtractJob() throws IOException, InterruptedException, GeneralSecurityException {
        Bigquery bigquery = createAuthorizedClient();
        
        //Create a new Extract job
        Job job = new Job();
        JobConfiguration config = new JobConfiguration();
        JobConfigurationExtract extractConfig = new JobConfigurationExtract();
        TableReference sourceTable = new TableReference();

        sourceTable.setProjectId(PROJECT_ID).setDatasetId(DATASET_ID).setTableId(TABLE_TO_EXPORT);
        extractConfig.setSourceTable(sourceTable);
        extractConfig.setDestinationUri(DESTINATION_URI);
        config.setExtract(extractConfig);
        job.setConfiguration(config);

        //Insert/Execute the created extract job
        Insert insert = bigquery.jobs().insert(PROJECT_ID, job);
        insert.setProjectId(PROJECT_ID);
        JobReference jobId = insert.execute().getJobReference();
        
        //Now check to see if the job has successfuly completed (Optional for extract jobs?)
        long startTime = System.currentTimeMillis();
        long elapsedTime;
        while (true) {
            Job pollJob = bigquery.jobs().get(PROJECT_ID, jobId.getJobId()).execute();
            elapsedTime = System.currentTimeMillis() - startTime;
            System.out.format("Job status (%dms) %s: %s\n", elapsedTime, jobId.getJobId(), pollJob.getStatus().getState());
            if (pollJob.getStatus().getState().equals("DONE")) {
                break;
            }
            //Wait a second before rechecking job status
            Thread.sleep(1000);
        }
        
    }

    private static Bigquery createAuthorizedClient() throws GeneralSecurityException, IOException {
        GoogleCredential credential = new GoogleCredential.Builder()
            .setTransport(TRANSPORT)
            .setJsonFactory(JSON_FACTORY)
            .setServiceAccountScopes(SCOPES)
            .setServiceAccountId(SERVICE_ACCOUNT_ID)
            .setServiceAccountPrivateKeyFromP12File(PRIVATE_KEY_FILE)
            .build();
        
        return Bigquery.builder(TRANSPORT, JSON_FACTORY)
            .setApplicationName("My Reports")
            .setHttpRequestInitializer(credential)
            .build();
    }
}

这是输出:

Job status (337ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: PENDING
...
Job status (9186ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: PENDING
Job status (10798ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: RUNNING
...
Job status (53952ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: RUNNING
Job status (55531ms) job_dc08f7327e3d48cc9b5ba708efe5b6b5: DONE

这是一张小桌子(大约 4MB),所以大约需要一分钟的工作似乎还可以。不知道为什么没有在存储桶中创建文件或如何进行调试。任何帮助,将不胜感激。

正如克雷格指出的那样,打印了 status.errorResult() 和 status.errors() 值。

  • getErrorResults(): {"message":"后端错误。作业中止。","re​​ason":"internalError"}
  • 获取错误():空
4

4 回答 4

0

我认为问题出在您使用的存储桶名称上——mybucket上面只是一个示例,您需要将其替换为您在 Google 存储中实际拥有的存储桶。如果您以前从未使用过 GS,那么介绍文档会有所帮助。

您的第二个问题是如何调试它——我建议在Job状态设置为DONE. 以错误结尾的作业仍然会DONE声明,不同之处在于它们附加了错误结果,因此job.getStatus().hasErrorResult()应该是正确的。(我从未使用过 Java 客户端库,所以我猜测该方法名称。)您可以在工作文档中找到更多信息。

于 2012-07-30T06:51:47.363 回答
0

看起来有一个访问被拒绝错误写入路径:gs://pixalate_test/from_java.csv。您能否确保执行导出作业的用户对存储桶具有写入权限(并且该文件尚不存在)?我已经就这个问题提交了一个内部 bigquery 错误……在这种情况下,我们应该给出一个更好的错误。.

于 2012-07-30T19:38:16.100 回答
0

我有同样的问题。但结果是我输入了错误的表名。但是,Google 并没有生成一条错误消息,说“该表不存在”。那将帮助我找到我的问题。

谢谢!

于 2019-07-03T22:00:12.167 回答
0

还有一个区别,我注意到您没有将作业类型传递为 config.setJobType(JOB_TYPE); 其中常量是私有静态最终字符串 JOB_TYPE = "extract"; 同样对于 json,也需要设置格式。

于 2019-02-11T19:21:52.683 回答