4

气流新手。尝试运行 sql 并将结果存储在 BigQuery 表中。

出现以下错误。不确定在哪里设置 default_rpoject_id。

请帮我。

错误:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 585, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/bigquery_operator.py", line 82, in execute
    self.allow_large_results, self.udf_config, self.use_legacy_sql)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 228, in run_query
    default_project_id=self.project_id)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 917, in _split_tablename
    assert default_project_id is not None, "INTERNAL: No default project is specified"
AssertionError: INTERNAL: No default project is specified

代码:

sql_bigquery = BigQueryOperator(
        task_id='sql_bigquery',
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        bql='''
            #standardSQL
                SELECT ID, Name, Group, Mark, RATIO_TO_REPORT(Mark) OVER(PARTITION BY Group) AS percent FROM `tensile-site-168620.temp.marks`
                ''',
        destination_dataset_table='temp.percentage',
        dag=dag
        )
4

2 回答 2

4

bigquery_conn_id='bigquery'编辑:在单独的 python 脚本中运行下面的代码后,我最终通过在 BigQueryOperator 任务中添加参数来解决这个问题。

显然,您需要在 Airflow UI 的 Admin -> Connection 中指定您的项目 ID。您必须将其作为 JSON 对象执行,例如“project”:“”。

就我个人而言,我无法让网络服务器在 GCP 上运行,所以这是不可行的。这里有一个程序化的解决方案:

from airflow.models import Connection
from airflow.settings import Session

session = Session()
gcp_conn = Connection(
    conn_id='bigquery',
    conn_type='google_cloud_platform',
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
        Connection.conn_id == gcp_conn.conn_id).first():
    session.add(gcp_conn)
    session.commit()

这些建议来自一个类似的问题here。

于 2017-08-11T14:30:25.233 回答
1

在本地运行气流时出现相同的错误。我的解决方案是添加以下连接字符串作为环境变量:

AIRFLOW_CONN_BIGQUERY_DEFAULT="google-cloud-platform://?extra__google_cloud_platform__project=<YOUR PROJECT HERE>"

BigQueryOperator 使用“bigquery_default”连接。如果未指定,本地气流将使用缺少属性的连接的内部版本project_id。如您所见,上面的连接字符串提供了该project_id属性。

在启动时,Airflow 会将以“AIRFLOW_”开头的环境变量加载到内存中。此机制可用于覆盖气流属性并在本地运行时提供连接,如气流文档here中所述。请注意,这也适用于直接运行气流而不启动 Web 服务器。

因此,我为所有连接设置了环境变量,例如AIRFLOW_CONN_MYSQL_DEFAULT. 我已将它们放入从我的 IDE 获取的 .ENV 文件中,但将它们放入您的 .bash_profile 也可以。

当您在 Cloud Composer 上查看您的气流实例时,您会看到在“bigquery_default”连接处project_id设置了属性。这就是 BigQueryOperator 在通过 Cloud Composer 运行时工作的原因。

(我正在使用气流 1.10.2 和 BigQuery 1.10.2)

于 2019-09-27T10:35:40.787 回答