我正在尝试将 BigQuery 数据集连接到 Databrick 并使用 Pyspark 运行脚本。
我做过的程序:
我将 BigQuery Json API 修补到 dbfs 中的 databrick 以进行连接访问。
然后我在集群库中添加了 spark-bigquery-latest.jar 并运行了我的脚本。
当我运行这个脚本时,我没有遇到任何错误。
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
.getOrCreate()
)
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
.option("parentProject", "google-project-ID") \
.option("project", "Dataset-Name") \
.option("table","dataset.schema.tablename") \
.load()
df.show()
但是,我没有在该架构中调用单个表,而是尝试使用如下查询调用它下的所有表:
from pyspark.sql import SparkSession
from google.cloud import bigquery
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
.getOrCreate()
)
client = bigquery.Client()
table_list = 'dataset.schema'
tables = client.list_tables(table_list)
for table in tables:
tlist = tlist.append(table)
for i in tlist:
sql_query = """select * from `dataset.schema.' + i +'`"""
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
.option("parentProject", "google-project-ID") \
.option("project", "Dataset-Name") \
.option("query", sql_query).load()
df.show()
或者
这个脚本:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
.getOrCreate()
)
sql_query = """select * from `dataset.schema.tablename`"""
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
.option("parentProject", "google-project-ID") \
.option("project", "Dataset-Name") \
.option("query", sql_query).load()
df.show()
我收到这个不寻常的错误:
IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment. Please set a project ID using the builder.
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<command-131090852> in <module>
35 .option("parentProject", "google-project-ID") \
36 .option("project", "Dataset-Name") \
---> 37 .option("query", sql_query).load()
38 #df.show()
39
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
131 # Hide where the exception came from that shows a non-Pythonic
132 # JVM exception message.
--> 133 raise_from(converted)
134 else:
135 raise
/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)
IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment. Please set a project ID using the builder.
当我将其称为表时,它确实可以识别我的项目 ID,但是当我将其作为查询运行时,我会收到此错误。
我试图弄清楚并通过许多网站寻找答案,但无法得到明确的答案。
非常感谢您的帮助...在此先感谢...