有这个 PTransform 将数据映射到 beam.Row:
class MapToBeamRow(beam.PTransform):
def expand(self, pcoll: PCollection[Any]) -> PCollection[beam.Row]:
return (
pcoll
| beam.Map(lambda x: beam.Row(foo='foo'))
)
以及使用该 beam.Row 应用 SqlTransform 的 PTransform
class FilterValuesInSegment(beam.PTransform):
def __init__(self, where_clause: str):
self.where_clause = where_clause
def expand(self, pcoll: PCollection[beam.Row]) -> PCollection[Any]:
return (
pcoll
# | beam.Map(lambda x: beam.Row(foo='foo'))
| SqlTransform("SELECT * FROM PCOLLECTION")
)
它通过以下方式在单元测试中被调用:
with TestPipeline() as p:
p
| beam.Create(records)
| MapToBeamRow()
| FilterValuesInSegment()
| beam.Map(print)
运行此代码会产生 java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [...]
异常
奇怪的是,如果我取消注释中的行FilterValuesInSegment
,代码确实有效。为什么会遇到编码器异常?