1

我是气流新手,我想安排一个工作,其中来自不同数据库记录计数的两个表必须检查它是否匹配。一种来源是 GCP,另一种来源是 Salesforce。

因此,我发现BigQueryOperator在 GCP 端进行查询并返回 Count 结果,但我找不到任何SalesforceQueryOperator可以在 Airflow 任务中分配的运算符。

所以基本上,我说的是我们可以用来带来计数结果的这个:

t1 = BigQueryOperator(
        task_id='bigquery_test',
        bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
        destination_dataset_table=False,
        bigquery_conn_id='bigquery_default',             
        google_cloud_storage_conn_id='bigquery_default',
        delegate_to=False,
        udf_config=False,
        dag=dag,
 )

我知道我们可以创建一个函数、导入库、创建与 Salesforce 的连接并运行查询以带来 Count 结果,但我不想遵循我已经尝试过的下面给出的这种方法(代码的一部分)。

def salesforcequery_count():
from simple_salesforce import Salesforce
import requests

session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
   username='user@example.com', password='password', organizationId='OrgId',
   session=session)
   count_record = sf.query("SELECT count(id) FROM Contact")
//   for row in data:
//   process(row)
    return 'count_record'

我想创建一个看起来像SalesforceQueryOperator并且应该像BigQueryOperator在 Salesforce 表中点击查询并带来结果的自定义运算符。

这是参考:https ://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

任何帮助将不胜感激。

4

1 回答 1

1

您可以使用现有的SalesforceHook创建您自己的自定义运算符。

这是一个例子:

from airflow.contrib.hooks.salesforce_hook import SalesforceHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SalesforceQueryOperator(BaseOperator):
    """
    Make a query against Salesforce
    Return result as dict.
    """
    template_fields = ("query",)

    @apply_defaults
    def __init__(self,
                 conn_id,
                 query=None,
                 *args,
                 **kwargs
                 ):
        super(SalesforceQueryOperator, self).__init__(*args, **kwargs)

        self.conn_id = conn_id
        self.query = query

    def execute(self, context):
        sf_hook = SalesforceHook(conn_id=self.conn_id)

        results = sf_hook.make_query(self.query)

        return results

然后在你的 DAG 中使用它:

t2 = SalesforceQueryOperator(
        task_id='salesforce_test',
        query='SELECT count(id) FROM Contact',
        conn_id='salesforce_default',             
        dag=dag,
 )

salesforce_default您在 AirFlow 中添加的连接在哪里。您可以在此处查看如何添加它:Salesforce Connection

于 2021-02-13T16:01:41.573 回答