我有一堆自定义运算符,我想尝试使用 XcomArg 并在我的任务中使用 .output。
例如,下面我注释掉了xcom_push
返回列表:
def execute(self, context):
# context["ti"].xcom_push(key="extract_list", value=extract_list)
return extract_list
问题是我的密钥历来是“extract_list”,我在其他地方有一些对该密钥的引用。我传递了其他 xcom(例如最大 ID/时间戳),这些 xcom 被标记为return_value
.
我可以更改我推送的 xcom 的密钥吗?
这个片段有效,但关键是return_value
:
extract = FileToAzureBlobOperator(
task_id="extract-test",
remote_directories=["/input/test"],
subfolders=["test", "raw"],
params={
"start": "{{ data_interval_start }}",
"end": "{{ data_interval_end }}",
},
)
transform = PrepareParquetOperator(
task_id="transform-test",
input_files=extract.output,
output_folder="test/staging",
custom_transform_script="scripts.common.test",
partition_columns=["date_id"],
)
我已经尝试添加test = XComArg(operator=extract, key="test_key")
,然后input_files=test
在我的转换任务中也有,但没有运气。我想我需要覆盖 FileToAzureBlobOperator 中的默认键。