我是气流新手,我想安排一个工作,其中来自不同数据库记录计数的两个表必须检查它是否匹配。一种来源是 GCP,另一种来源是 Salesforce。
因此,我发现BigQueryOperator
在 GCP 端进行查询并返回 Count 结果,但我找不到任何SalesforceQueryOperator
可以在 Airflow 任务中分配的运算符。
所以基本上,我说的是我们可以用来带来计数结果的这个:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)
我知道我们可以创建一个函数、导入库、创建与 Salesforce 的连接并运行查询以带来 Count 结果,但我不想遵循我已经尝试过的下面给出的这种方法(代码的一部分)。
def salesforcequery_count():
from simple_salesforce import Salesforce
import requests
session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
username='user@example.com', password='password', organizationId='OrgId',
session=session)
count_record = sf.query("SELECT count(id) FROM Contact")
// for row in data:
// process(row)
return 'count_record'
我想创建一个看起来像SalesforceQueryOperator
并且应该像BigQueryOperator
在 Salesforce 表中点击查询并带来结果的自定义运算符。
这是参考:https ://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
任何帮助将不胜感激。