0

有这个 PTransform 将数据映射到 beam.Row:

class MapToBeamRow(beam.PTransform):
    def expand(self, pcoll: PCollection[Any]) -> PCollection[beam.Row]:
        return (
            pcoll
            | beam.Map(lambda x: beam.Row(foo='foo'))
        )

以及使用该 beam.Row 应用 SqlTransform 的 PTransform

class FilterValuesInSegment(beam.PTransform):
    def __init__(self, where_clause: str):
        self.where_clause = where_clause

    def expand(self, pcoll: PCollection[beam.Row]) -> PCollection[Any]:
        return (
                pcoll
                # | beam.Map(lambda x: beam.Row(foo='foo'))
                | SqlTransform("SELECT * FROM PCOLLECTION")
        )

它通过以下方式在单元测试中被调用:

with TestPipeline() as p:
  p
  | beam.Create(records)
  | MapToBeamRow()
  | FilterValuesInSegment()
  | beam.Map(print)

运行此代码会产生 java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [...]异常

奇怪的是,如果我取消注释中的行FilterValuesInSegment,代码确实有效。为什么会遇到编码器异常?

4

0 回答 0