2

我是 gcloud 和 BigQuery 的新手,想使用 spark 从 BigQuery 读取数据。我使用了适用于 Java 的 Google API 客户端库。并能够与 BigQuery 连接。我得到了com.google.api.services.bigquery.Bigquery对象并能够打印读取的数据集、tableId 和 tableData

我的问题是

如何将此 BigQuery 身份验证对象(凭据对象)连接到 spark 或者无论如何将此对象与 hadoopApi 一起使用

如果不可能将凭证对象传递给 newHadoopAPi

GoogleAuthorizationCodeFlow flow = getFlow();
    GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
            .setRedirectUri(REDIRECT_URI).execute();
    Credential credential=flow.createAndStoreCredential(response, null);
    return credential; 

我的 Hadoop api 代码是我想使用我的凭证对象的地方

val tableData = sc.newAPIHadoopRDD(
  conf,
  classOf[GsonBigQueryInputFormat],
  classOf[LongWritable],
  classOf[JsonObject]).
4

2 回答 2

3

我认为适用于 Hadoop 的 BigQuery 连接器可能会解决您的问题,而无需您编写自己的低级客户端。看看:https ://cloud.google.com/hadoop/bigquery-connector

这是一个使用它将 Spark 连接到 BigQuery 的示例:https ://cloud.google.com/hadoop/examples/bigquery-connector-spark-example

于 2016-03-25T18:33:47.597 回答
1

感谢@michael 在您的链接的帮助下,我找到了解决方案

只需在 hadoop 配置上禁用服务帐户

hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")

以下代码将起作用

val hadoopConfiguration = sc.hadoopConfiguration
//BigQueryConfiguration.
hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
hadoopConfiguration.set("fs.gs.project.id", projectId);
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
hadoopConfiguration.set("fs.gs.auth.client.id",
  clientId)
hadoopConfiguration.set("fs.gs.auth.client.secret",
  clientSecret)
hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)

// Configure input and output for BigQuery access.
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
val tableData = sc.newAPIHadoopRDD(
  hadoopConfiguration,
  classOf[GsonBigQueryInputFormat],
  classOf[LongWritable],
  classOf[JsonObject])

其中令牌路径包含刷新令牌

{
    "credentials": {
        "user": {
            "access_token":     "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
            "expiration_time_millis": 1460473581255,
            "refresh_token": "XXXXXXXXXxxxxxxxxx"
            }
       }
}
于 2016-04-18T05:01:14.083 回答