0

我有以下场景来理解我的问题。

  • 有一个外部节点(节点 A)、一个客户端应用程序(App1)和一个 Hazelcast-Jet 作业应用程序(节点 B,App2)。
  • App1 从 FlatBuffers 收集数据并将其包装在一个 Object HzData 中。HzData 是 DataSerializable 实现的。
  • 然后将 HzData 放入 IMAP 作为 IMAP,称为 hzMap。
  • 首先,我启动节点 A,然后启动 App1。App1 将 hzMap 存入节点 A。
  • 然后,我运行 App2,它依次启动节点 B 并运行 Jet 作业。
  • 关于 App2 的一点点信息,我在 App2 中以相同的包名称拥有相同的 HzData。我在 JobConfig 中添加了与作业相关的所有类。
  • 然后我的管道包含类似于此代码的内容。

BatchSource<Map.Entry<Integer, HzData>> dataBatchSource = Sources.map('hzMap'); BatchStage<HzData> dataBatchStage = pipe.drawFrom(dataBatchSource ). mapUsingContext(ContextFactories.replicatedMapContext ('hzMap'), (map, data) -> data.getValue()); dataBatchStage.drainTo(Sinks.logger()) 上面的报价可以很好地记录所有数据。如果我使用像下面这样的过滤器,它会给我带来问题

dataBatchStage.filter(v -> v.getCheck() == 0).drainTo(Sinks.logger());

以上导致我出现错误,例如,

com.hazelcast.jet.JetException: Exception in ProcessorTasklet{filter#24}: java.lang.ClassCastException: com.nexus.api.portables.HzData cannot be cast to com.nexus.api.portables.HzData

该错误似乎是一个反序列化错误,但我想知道上面的记录器是如何工作的。

我也尝试过使用filterUsingContext()但仍然得到相同的结果。

提前致谢。期待您的宝贵意见和解决方案。

4

0 回答 0