0

我想始终如一地RicherIndicatorCoder为我的案例类应用自定义RicherIndicator。此外,如果我无法为TuplesKVs包含新的编码器,RicherIndicator那么我希望获得编译时或运行时错误,而不是求助于次优编码器。

然而 Scio 似乎并不尊重@DefaultCoder注释:

@DefaultCoder(classOf[RicherIndicatorCoder]) // Ignored
case class RicherIndicator (
  policy: Policy,
  indicator: Indicator
)

Scio 也不会优先考虑使用 注册的自定义编码器CoderRegistry,而是使用自己的默认编码器:

val registry = sc.pipeline.getCoderRegistry
registry.registerCoderForClass(classOf[RicherIndicator], RicherIndicatorCoder.of) // Not used

因此,我必须在出现这种类型的setCoder(RicherIndicatorCoder.of)任何地方使用SCollection,并仔细梳理管道,以防有复合类型包含RicherIndicator.

有没有办法将我的自定义编码器设置为默认值,或者禁用默认的基于 Magnolia 或 Kryo 的编码器?

4

1 回答 1

0

Java 注解在 Scala 中不起作用。您可以像这样将 Beam 包装Coder为隐式 Scio Coder

implicit val richIndicateCoder = Coder.beam(RicherIndicatorCoder)

只要隐含在范围内,就应该选择它。

于 2019-06-13T15:47:37.610 回答