0

我正在尝试使用 Vertex AI(谷歌云平台)内的 kubeflow 管道(kfp)组件开发一个自定义管道。管道的步骤是:

  1. 从大查询表中读取数据
  2. 创建熊猫DataFrame
  3. 使用DataFrame训练 K-Means 模型
  4. 将模型部署到端点

这是第 2 步的代码。我必须使用Output[Artifact]作为输出,因为pd.DataFrame我在这里找到的类型不起作用。

@component(base_image="python:3.9", packages_to_install=["google-cloud-bigquery","pandas","pyarrow"])
def create_dataframe(
    project: str,
    region: str,
    destination_dataset: str,
    destination_table_name: str,
    df: Output[Artifact],
):
    
    from google.cloud import bigquery
    
    client = bigquery.Client(project=project, location=region)
    dataset_ref = bigquery.DatasetReference(project, destination_dataset)
    table_ref = dataset_ref.table(destination_table_name)
    table = client.get_table(table_ref)

    df = client.list_rows(table).to_dataframe()

这里是第 3 步的代码:

@component(base_image="python:3.9", packages_to_install=['sklearn'])
def kmeans_training(
        dataset: Input[Artifact],
        model: Output[Model],
        num_clusters: int,
):
    from sklearn.cluster import KMeans
    model = KMeans(num_clusters, random_state=220417)
    model.fit(dataset)

由于以下错误,管道的运行已停止:

TypeError: float() argument must be a string or a number, not 'Artifact'

是否可以将 Artifact 转换为numpy arrayor Dataframe

4

1 回答 1

0

我找到了使用以下代码的解决方案。现在我可以使用步骤 2 的输出来训练步骤 3 中的模型。

第2步:

@component(base_image="python:3.9", packages_to_install=["google-cloud-bigquery","pandas","pyarrow"])
def create_dataframe(
    project: str,
    region: str,
    destination_dataset: str,
    destination_table_name: str,
    df: Output[Dataset],
):
    
    from google.cloud import bigquery
    
    client = bigquery.Client(project=project, location=region)
    dataset_ref = bigquery.DatasetReference(project, destination_dataset)
    table_ref = dataset_ref.table(destination_table_name)
    table = client.get_table(table_ref)

    train = client.list_rows(table).to_dataframe()
    
    train.to_csv(df.path)

第 3 步:

@component(base_image="python:3.9", packages_to_install=['sklearn','pandas','joblib'])
def kmeans_training(
        dataset: Input[Dataset],
        model_artifact: Output[Model],
        num_clusters: int,
):
    from sklearn.cluster import KMeans
    import pandas as pd
    from joblib import dump
    
    data = pd.read_csv(dataset.path)
    
    model = KMeans(num_clusters, random_state=220417)
    model.fit(data)
    
    dump(model, model_artifact.path)
于 2021-11-17T10:40:34.060 回答