1

我正在尝试将 BigQuery 数据集连接到 Databrick 并使用 Pyspark 运行脚本。

我做过的程序:

  • 我将 BigQuery Json API 修补到 dbfs 中的 databrick 以进行连接访问。

  • 然后我在集群库中添加了 spark-bigquery-latest.jar 并运行了我的脚本。

当我运行这个脚本时,我没有遇到任何错误。

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName('bq')
    .master('local[4]')
    .config('parentProject', 'google-project-ID')
    .config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
    .getOrCreate()
)
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
  .option("parentProject", "google-project-ID") \
  .option("project", "Dataset-Name") \
  .option("table","dataset.schema.tablename") \
  .load()
df.show()

但是,我没有在该架构中调用单个表,而是尝试使用如下查询调用它下的所有表:

from pyspark.sql import SparkSession
from google.cloud import bigquery
spark = (
    SparkSession.builder
    .appName('bq')
    .master('local[4]')
    .config('parentProject', 'google-project-ID')
    .config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
    .getOrCreate()
)
client = bigquery.Client()
table_list = 'dataset.schema'
tables = client.list_tables(table_list)

for table in tables:
   tlist = tlist.append(table)

for i in tlist:
   sql_query = """select * from `dataset.schema.' + i +'`"""
   df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
  .option("parentProject", "google-project-ID") \
  .option("project", "Dataset-Name") \
  .option("query", sql_query).load()
  df.show()

或者

这个脚本:

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName('bq')
    .master('local[4]')
    .config('parentProject', 'google-project-ID')
    .config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
    .getOrCreate()
)
sql_query = """select * from `dataset.schema.tablename`"""
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
  .option("parentProject", "google-project-ID") \
  .option("project", "Dataset-Name") \
  .option("query", sql_query).load()
  df.show()

我收到这个不寻常的错误:

IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment.  Please set a project ID using the builder.
---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<command-131090852> in <module>
     35   .option("parentProject", "google-project-ID") \
     36   .option("project", "Dataset-Name") \
---> 37   .option("query", sql_query).load()
     38 #df.show()
     39 

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    183         else:
--> 184             return self._df(self._jreader.load())
    185 
    186     @since(1.4)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    131                 # Hide where the exception came from that shows a non-Pythonic
    132                 # JVM exception message.
--> 133                 raise_from(converted)
    134             else:
    135                 raise

/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)

IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment.  Please set a project ID using the builder.

当我将其称为表时,它确实可以识别我的项目 ID,但是当我将其作为查询运行时,我会收到此错误。

我试图弄清楚并通过许多网站寻找答案,但无法得到明确的答案。

非常感谢您的帮助...在此先感谢...

4

2 回答 2

1

你能避免使用查询而只使用 table 选项吗?

from pyspark.sql import SparkSession
from google.cloud import bigquery
spark = (
    SparkSession.builder
    .appName('bq')
    .master('local[4]')
    .config('parentProject', 'google-project-ID')
    .config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar') \
    .getOrCreate()
)
client = bigquery.Client()
table_list = 'dataset.schema'
tables = client.list_tables(table_list)

for table in tables:
   tlist = tlist.append(table)

for i in tlist:
    df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json") \
      .option("parentProject", "google-project-ID") \
      .option("project", "Dataset-Name") \
      .option("table","dataset.schema." + str(i)) \
      .load()
    df.show()
于 2020-12-15T08:56:53.563 回答
1

在我的情况下,我有同样的例外,但因为我没有指定parentProject我要连接的 BigQuery 项目 ID的配置值

于 2021-02-25T10:51:18.547 回答