首先,如本问题所述,BigQuery 连接器已预安装在Cloud Dataproc集群上。
下面是一个关于如何将 BigQuery 中的数据读入 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。您使用 Spark 从 BigQuery 读取数据SparkContext.newAPIHadoopRDD
。Spark 文档有更多关于使用SparkContext.newAPIHadoopRDD
. '
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
"[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]"
val jobName = "wordcount"
val conf = sc.hadoopConfiguration
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)
// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
fullyQualifiedOutputTableId, outputTableSchema)
val fieldName = "word"
val tableData = sc.newAPIHadoopRDD(conf,
classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)
您将需要使用您的设置自定义此示例,包括您的 Cloud Platform 项目 ID<your-project-id>
和您的输出表 ID <your-fully-qualified-table-id>
。
最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,此页面提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。