10

根据 Dataproc文档,它具有“与 BigQuery 的本机和自动集成”。

我在 BigQuery 中有一张表。我想使用我创建的 Dataproc 集群(使用 PySpark 作业)读取该表并对其执行一些分析。然后将此分析的结果写回 BigQuery。您可能会问“为什么不直接在 BigQuery 中进行分析!?” - 原因是因为我们正在创建复杂的统计模型,而 SQL 级别太高,无法开发它们。我们需要 Python 或 R 之类的东西,因此 Dataproc。

他们有任何可用的 Dataproc + BigQuery 示例吗?我找不到任何东西。

4

2 回答 2

9

首先,如本问题所述,BigQuery 连接器已预安装在Cloud Dataproc集群上。

下面是一个关于如何将 BigQuery 中的数据读入 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。您使用 Spark 从 BigQuery 读取数据SparkContext.newAPIHadoopRDDSpark 文档有更多关于使用SparkContext.newAPIHadoopRDD. '

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject

import org.apache.hadoop.io.LongWritable

val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
    "[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]"
val jobName = "wordcount"

val conf = sc.hadoopConfiguration

// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)

// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)

// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
    fullyQualifiedOutputTableId, outputTableSchema)

val fieldName = "word"

val tableData = sc.newAPIHadoopRDD(conf,
    classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)

您将需要使用您的设置自定义此示例,包括您的 Cloud Platform 项目 ID<your-project-id>和您的输出表 ID <your-fully-qualified-table-id>

最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,此页面提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。

于 2015-10-09T18:59:28.480 回答
1

上面的示例没有显示如何将数据写入输出表。你需要这样做:

.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY), 
classOf[String], 
classOf[JsonObject], 
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)

其中 key: String 实际上被忽略了

于 2015-11-02T17:51:28.863 回答