我想始终如一地RicherIndicatorCoder
为我的案例类应用自定义RicherIndicator
。此外,如果我无法为Tuples
或KVs
包含新的编码器,RicherIndicator
那么我希望获得编译时或运行时错误,而不是求助于次优编码器。
然而 Scio 似乎并不尊重@DefaultCoder
注释:
@DefaultCoder(classOf[RicherIndicatorCoder]) // Ignored
case class RicherIndicator (
policy: Policy,
indicator: Indicator
)
Scio 也不会优先考虑使用 注册的自定义编码器CoderRegistry
,而是使用自己的默认编码器:
val registry = sc.pipeline.getCoderRegistry
registry.registerCoderForClass(classOf[RicherIndicator], RicherIndicatorCoder.of) // Not used
因此,我必须在出现这种类型的setCoder(RicherIndicatorCoder.of)
任何地方使用SCollection
,并仔细梳理管道,以防有复合类型包含RicherIndicator
.
有没有办法将我的自定义编码器设置为默认值,或者禁用默认的基于 Magnolia 或 Kryo 的编码器?