我有以下场景来理解我的问题。
- 有一个外部节点(节点 A)、一个客户端应用程序(App1)和一个 Hazelcast-Jet 作业应用程序(节点 B,App2)。
- App1 从 FlatBuffers 收集数据并将其包装在一个 Object HzData 中。HzData 是 DataSerializable 实现的。
- 然后将 HzData 放入 IMAP 作为 IMAP,称为 hzMap。
- 首先,我启动节点 A,然后启动 App1。App1 将 hzMap 存入节点 A。
- 然后,我运行 App2,它依次启动节点 B 并运行 Jet 作业。
- 关于 App2 的一点点信息,我在 App2 中以相同的包名称拥有相同的 HzData。我在 JobConfig 中添加了与作业相关的所有类。
- 然后我的管道包含类似于此代码的内容。
BatchSource<Map.Entry<Integer, HzData>> dataBatchSource = Sources.map('hzMap');
BatchStage<HzData> dataBatchStage = pipe.drawFrom(dataBatchSource ).
mapUsingContext(ContextFactories.replicatedMapContext
('hzMap'),
(map, data) -> data.getValue());
dataBatchStage.drainTo(Sinks.logger())
上面的报价可以很好地记录所有数据。如果我使用像下面这样的过滤器,它会给我带来问题
dataBatchStage.filter(v -> v.getCheck() == 0).drainTo(Sinks.logger());
以上导致我出现错误,例如,
com.hazelcast.jet.JetException: Exception in ProcessorTasklet{filter#24}: java.lang.ClassCastException: com.nexus.api.portables.HzData cannot be cast to com.nexus.api.portables.HzData
该错误似乎是一个反序列化错误,但我想知道上面的记录器是如何工作的。
我也尝试过使用filterUsingContext()
但仍然得到相同的结果。
提前致谢。期待您的宝贵意见和解决方案。