在处理项目时遇到了一种情况,我们想使用气流执行一些任务,但我们不允许使用 python 运算符,但被指示使用本地 BigQuery 运算符。任何人都可以帮助我设置气流变量或如何编写将由 BQ 操作员执行的条件代码之类的代码。是否可以 ?如果不是,那么我的下一个问题是,是否有可能使用 BQ 运算符从 BQ 表中获取结果并将其分配给一个 python 变量,所以想要同时使用 BQ 和 python 运算符,有什么办法吗?
问问题
100 次
1 回答
0
因此,这里有一个示例,说明如何使用 BigQuery 运算符并使用交叉通信将数据发送到另一个任务xcom_pull
。
您可以使用BigQueryGetDataOperator
orBigQueryOperator
通过自定义查询来查询数据。这些运算符将为您返回一个列表,因此您可以在另一个任务中获取它。我在示例中的 bash 运算符中使用了它:
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDataOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
BQ_LOCATION = "europe-north1"
TABLE_NAME="<table-name>"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")
with models.DAG(
"example_bigquery_operations",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
max_results=1,
selected_fields="name",
#location=BQ_LOCATION,
)
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
)
get_data >> get_dataset_result
[2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
[2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_bigquery_operations
AIRFLOW_CTX_TASK_ID=get_dataset_result
AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
[2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[[\'Tom\']]"']
[2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
[2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
[2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0
于 2021-12-06T17:27:36.410 回答