0

我需要将数据从 xcom 提取到一个 python 变量中,该变量将使用一些正则表达式进行转换并进一步传递。但是,我无法在任何地方找到如何在不使用任何运算符的情况下从 xcom 读取数据(直接进入 python 代码)。我在 AWS 上使用气流 2.0.2 的 MWAA 并使用下面的代码片段。

s3Path = ""
def pull_from_xcom(**context):
        global s3Path
        msg = context['ti'].xcom_pull(task_ids='sqs', key='messages')
        s3Path = msg['Messages'][0]['Body']

    SQSRUN = SQSSensor(
    task_id='sqs',
    poke_interval=0,
    timeout=10,
    sqs_queue=SQS_URL,
    aws_conn_id=AWS)

    xcomGet = PythonOperator(
    task_id='xcom_pull',
    python_callable=pull_from_xcom,
    provide_context=True,
    depends_on_past=False)

    # s3Path Transformations
    para1 = re.findall(r"(para1=\w+)",s3Path)
    para2 = re.findall(r"(para2=\w+)",s3Path)

    sparkstep = #Constructing dict using para1 and para2 for spark job submission

    #Calling sparkStep
    sparkTransform = EmrAddStepsOperator(
            task_id='S3PathTransform',
            job_flow_id=Variable.get("EMR"),
            aws_conn_id=AWS,
            steps=sparkstep,
        )
        #Further tasks in dag

这不起作用,因为 python 运算符将在 dag 运行后工作,而我在运行 dag 之前将 s3Path 转换后的值用于另一个运算符。我尝试将 s3Path 值设置为变量并读取它,但这不起作用,因为在上传 dag 时没有创建该变量。

我看到它ti.xcom_pull(key=messages, task_ids='sqs')可以用来从 xcom 中提取数据,但是我应该从哪里得到 ti?有没有办法让任务实例在不使用任何运算符的情况下与 xcom 一起工作。

基本上问题是如何获得 SQSRUN 发送给 xcom 的值。我无法找到有关如何使用 SQSSensor 获取的值的任何文档或在线链接。非常感谢一些帮助。

4

1 回答 1

1

我看到 ti.xcom_pull(key=messages, task_ids='sqs') 可用于从 xcom 提取数据,但我应该从哪里获得ti

ti在执行上下文中传递。您的代码段演示了这是如何完成的。

有没有办法让任务实例在不使用任何运算符的情况下与 xcom 一起工作?

是的,您可以通过类似 Airflow 的方式查询数据库来获取 xcom。

from airflow.utils.session import provide_session
from airflow.models.xcom import XCom

@provide_session
def get_sqs_messages(session):
    query = XCom.get_many(
        key="messages",
        dag_ids="dag-id",
        task_ids="sqs",
        session=session,
        limit=1
    )
    # ensure the most recent value is retrieved.
    query = query.order_by("execution_date desc")
    xcom = query.with_entities(XCom.value).first()

    if xcom:
       return XCom.deserialize_value(xcom)

在您的代码段中,您似乎s3Path在 dag 模块中设置全局并在运算符中覆盖其值。 EmrAddStepsOperator当模块被解析为绑定到的初始值时被初始化s3Path

鉴于您的目标是从 xcom 值中steps获取初始化值,有一种更好的方法,EmrAddStepsOperator

steps传递给EmrAddStepsOperator构造函数的 kwargs 是模板化的。这意味着您可以为其值提供 Jinja2 模板字符串,这在任务实例的初始化期间被嵌入。

sparkstep可以声明为:

sparkstep = "{{sparkstep_from_messsages(ti.xcom_pull(task_ids='sqs', key='messages'))}}"

sparkTransform = EmrAddStepsOperator(
            task_id='S3PathTransform',
            job_flow_id=Variable.get("EMR"),
            aws_conn_id=AWS,
            steps=sparkstep,
        )

在那里,从 xcom 提取的值被传递给一个名为sparkstep_from_messages如下定义的函数。

def sparkstep_from_messages(messages):
    # s3Path Transformations
    para1 = re.findall(r"(para1=\w+)",s3Path)
    para2 = re.findall(r"(para2=\w+)",s3Path)

    sparkstep = #Constructing dict using para1 and para2 for spark job submission
    return sparkstep

您必须在 DAG 初始化中将此函数作为user_defined_macros提供,以便它在模板上下文中可用。

user_defined_macros = dict(
    sparkstep_from_messages=sparkstep_from_messages
)

dag = DAG(dag_id="sample-dag", user_defined_macros=user_defined_macros)
于 2021-10-12T21:53:31.833 回答