0

我有一堆自定义运算符,我想尝试使用 XcomArg 并在我的任务中使用 .output。

例如,下面我注释掉了xcom_push返回列表:

def execute(self, context):
    # context["ti"].xcom_push(key="extract_list", value=extract_list)
    return extract_list

问题是我的密钥历来是“extract_list”,我在其他地方有一些对该密钥的引用。我传递了其他 xcom(例如最大 ID/时间戳),这些 xcom 被标记为return_value.

我可以更改我推送的 xcom 的密钥吗?

这个片段有效,但关键是return_value

    extract = FileToAzureBlobOperator(
        task_id="extract-test",
        remote_directories=["/input/test"],
        subfolders=["test", "raw"],
        params={
            "start": "{{ data_interval_start }}",
            "end": "{{ data_interval_end }}",
        },
    )

    transform = PrepareParquetOperator(
        task_id="transform-test",
        input_files=extract.output,
        output_folder="test/staging",
        custom_transform_script="scripts.common.test",
        partition_columns=["date_id"],
    )

我已经尝试添加test = XComArg(operator=extract, key="test_key"),然后input_files=test在我的转换任务中也有,但没有运气。我想我需要覆盖 FileToAzureBlobOperator 中的默认键。

4

2 回答 2

2

我相信您应该能够将值作为字典返回以获得您想要的内容:

return {"extract_list": extract_list}
于 2021-11-27T14:08:04.410 回答
0

这是一个使用 XComArg 工作的示例。它需要稍微更改我的 DAG 文件。

    extract = FileToAzureBlobOperator(
        task_id="extract-test",
        remote_directories=["/input/test"],
        subfolders=["test", "raw"],
        params={
            "start": "{{ data_interval_start }}",
            "end": "{{ data_interval_end }}",
        },
    )

    extracted_files = XComArg(extract, "extract_list")

    transform = PrepareParquetOperator(
        task_id="transform",
        input_files=extracted_files,
    )

    transformed_files = XComArg(transform, "filter_list")

    finalize = DatasetToDatasetOperator(
        task_id="finalize",
        input_files=transformed_files,
    
    )

    extracted_files >> transformed_files >> finalize

最终我的计划是xcom.push()从我的运营商中删除并直接返回值,这样我就可以在.output不需要 XComArg 行的情况下使用。我只需要清理对其他区域中自定义键名的引用。

于 2021-12-01T11:17:08.390 回答