我需要将数据从 xcom 提取到一个 python 变量中,该变量将使用一些正则表达式进行转换并进一步传递。但是,我无法在任何地方找到如何在不使用任何运算符的情况下从 xcom 读取数据(直接进入 python 代码)。我在 AWS 上使用气流 2.0.2 的 MWAA 并使用下面的代码片段。
s3Path = ""
def pull_from_xcom(**context):
global s3Path
msg = context['ti'].xcom_pull(task_ids='sqs', key='messages')
s3Path = msg['Messages'][0]['Body']
SQSRUN = SQSSensor(
task_id='sqs',
poke_interval=0,
timeout=10,
sqs_queue=SQS_URL,
aws_conn_id=AWS)
xcomGet = PythonOperator(
task_id='xcom_pull',
python_callable=pull_from_xcom,
provide_context=True,
depends_on_past=False)
# s3Path Transformations
para1 = re.findall(r"(para1=\w+)",s3Path)
para2 = re.findall(r"(para2=\w+)",s3Path)
sparkstep = #Constructing dict using para1 and para2 for spark job submission
#Calling sparkStep
sparkTransform = EmrAddStepsOperator(
task_id='S3PathTransform',
job_flow_id=Variable.get("EMR"),
aws_conn_id=AWS,
steps=sparkstep,
)
#Further tasks in dag
这不起作用,因为 python 运算符将在 dag 运行后工作,而我在运行 dag 之前将 s3Path 转换后的值用于另一个运算符。我尝试将 s3Path 值设置为变量并读取它,但这不起作用,因为在上传 dag 时没有创建该变量。
我看到它ti.xcom_pull(key=messages, task_ids='sqs')
可以用来从 xcom 中提取数据,但是我应该从哪里得到 ti?有没有办法让任务实例在不使用任何运算符的情况下与 xcom 一起工作。
基本上问题是如何获得 SQSRUN 发送给 xcom 的值。我无法找到有关如何使用 SQSSensor 获取的值的任何文档或在线链接。非常感谢一些帮助。