0

我在 apache 梁中运行以下示例

import apache_beam as beam
from apache_beam import Row
from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam.dataframe.transforms import DataframeTransform
import logging
import argparse
import sys
import pandas

logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

# Just a Dummy Dataframe Transform Function, Ignore the logic
def transformdf(a, b):
    a["addr"] = "addr-common"
    return a

p = beam.Pipeline(options=pipeline_options)

# Schema Aware Pcollection
data1 = [Row(id=1, name="abc"), Row(id=2, name="def"), Row(id=3, name="ghi")]
pcol1 = (p | "Create1" >> beam.Create(data1))

data2 = [Row(addr="addr1"), Row(addr="addr2"), Row(addr="addr3")]
pcol2 = (p | "Create2" >> beam.Create(data2))

pcol = ({"a":pcol1, "b":pcol2} | "TransformedDF" >> DataframeTransform(transformdf))
# The above throws issue with duplicate label error

pcol | "Map" >> beam.Map(lambda row: {"id":row.id, "name":row.name, "addr":row.addr}) | "Print" >> beam.Map(print)

p.run().wait_until_finish()

代码错误并出现错误`

RuntimeError:管道中已存在带有标签“TransformedDF/BatchElements(pc)”的转换`

根据链接 https://beam.apache.org/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline ,语法和用法似乎是正确的

输出 = {"a":pcol1, "b":pcol2"} | DataframeTransform(lambda/function)

我目前正在使用 apache beam 2.35.0 Python SDK 有这个问题吗?

4

0 回答 0