2

我正在使用 azureml Python SDK 构建 Azure ML 管道。管道调用 PythonScriptStep,它将数据存储在 AML 工作区的 workspaceblobstore 上。

我想扩展管道以将管道数据导出到 Azure 数据湖(第 1 代)。据我了解,Azure ML 不支持将 PythonScriptStep 的输出直接连接到 Azure Data Lake(第 1 代)。因此,我在管道中添加了一个额外的 DataTransferStep,它将 PythonScriptStep 的输出作为直接输入到 DataTransferStep。根据 Microsoft 文档,这应该是可能的。

到目前为止,我已经构建了这个解决方案,只有这会在 Gen 1 Data Lake 上产生一个 0 字节的文件。我认为 output_export_blob PipelineData 没有正确引用 test.csv,因此 DataTransferStep 找不到输入。如何将 DataTransferStep 与 PythonScriptStep 的 PipelineData 输出正确连接?

我遵循的示例: https ://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-步骤.ipynb

管道.py

input_dataset = delimited_dataset(
    datastore=prdadls_datastore,
    folderpath=FOLDER_PATH_INPUT,
    filepath=INPUT_PATH
)

output_export_blob = PipelineData(
    'export_blob',
    datastore=workspaceblobstore_datastore,
)

test_step = PythonScriptStep(
    script_name="test_upload_stackoverflow.py",
    arguments=[
        "--output_extract", output_export_blob,
    ],
    inputs=[
        input_dataset.as_named_input('input'),
    ],
    outputs=[output_export_blob],
    compute_target=aml_compute,
    source_directory="."
)

output_export_adls = DataReference(
    datastore=prdadls_datastore, 
    path_on_datastore=os.path.join(FOLDER_PATH_OUTPUT, 'test.csv'),
    data_reference_name='export_adls'        
)

export_to_adls = DataTransferStep(
    name='export_output_to_adls',
    source_data_reference=output_export_blob,
    source_reference_type='file',
    destination_data_reference=output_export_adls,
    compute_target=adf_compute
)

pipeline = Pipeline(
    workspace=aml_workspace, 
    steps=[
        test_step, 
        export_to_adls
    ]
)

test_upload_stackoverflow.py

import os
import pathlib
from azureml.core import Datastore, Run

parser = argparse.ArgumentParser("train")
parser.add_argument("--output_extract", type=str)
args = parser.parse_args() 

run = Run.get_context()
df_data_all = (
    run
    .input_datasets["input"]
    .to_pandas_dataframe()
)

os.makedirs(args.output_extract, exist_ok=True)
df_data_all.to_csv(
    os.path.join(args.output_extract, "test.csv"), 
    index=False
)
4

1 回答 1

0

代码示例非常有用。感谢那。你是对的,它可能会令人困惑PythonScriptStep -> PipelineData。即使没有DataTransferStep.

我不知道 100% 发生了什么,但我想我会吐出一些想法:

  1. 您的 , 是否PipelineData包含 export_blob“test.csv”文件?在对DataTransferStep. 您可以使用 SDK 或更轻松地使用 UI 来验证这一点。
    1. 转到 PipelineRun 页面,单击有PythonScriptStep问题的。
    2. 在“输出 + 日志”页面上,有一个“数据输出”部分(最初加载速度很慢)
    3. 打开它,你会看到输出 PipelineDatas 然后点击“查看输出”
    4. 在 Azure 门户或 Azure 存储资源管理器中导航到给定路径。 在此处输入图像描述 在此处输入图像描述
  2. 在调用时,test_upload_stackoverflow.py您将其PipelineData视为目录,.to_csv()而不是您刚刚调用的文件df_data_all.to_csv(args.output_extract, index=False)。也许尝试定义PipelineDatawith is_directory=True。不确定这是否是必需的。
于 2020-06-10T18:21:40.647 回答