问题标签 [hazelcast-jet]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
hazelcast-jet - Hazelcast Jet 0.6.1 + Pipeline API + 定制处理器
我正在尝试将自定义处理器附加到 Hazelcast Jet Pipeline 定义。
这是示例代码。
这是 tryProcess() 方法的示例代码
在执行时,我得到了以下异常
有什么遗漏吗?你能帮我解决这个问题吗?
hazelcast-jet - Hazelcast Jet 0.6.1 DAG 与 PipeLine API
我为 DAG 创建了以下示例代码以了解聚合。看来,slidingWindow
顶点不发出任何记录。
不确定,这里出了什么问题..
同样,我为 Pipeline API 创建了以下示例代码以进行聚合。
这很好用。这将打印文本文件中的记录。
请您帮我更正 DAG 定义吗?
hazelcast-jet - Hazelcast Jet 0.6.1 - Pipeline customTransform API 的编译错误
我收到 Pipeline customTransform
API 的以下编译错误。
这是构建管道的示例代码:
这是tryProcess()
方法的示例代码:
这是编译错误。
这可以使用以下代码编译并运行良好。
但是,下面的代码给出了编译错误。
你能帮我解决这个问题吗?
java - 如何实现 mapUsingContext() / filterUsingContext() - hazelcast-jet
我有以下场景来理解我的问题。
- 有一个外部节点(节点 A)、一个客户端应用程序(App1)和一个 Hazelcast-Jet 作业应用程序(节点 B,App2)。
- App1 从 FlatBuffers 收集数据并将其包装在一个 Object HzData 中。HzData 是 DataSerializable 实现的。
- 然后将 HzData 放入 IMAP 作为 IMAP,称为 hzMap。
- 首先,我启动节点 A,然后启动 App1。App1 将 hzMap 存入节点 A。
- 然后,我运行 App2,它依次启动节点 B 并运行 Jet 作业。
- 关于 App2 的一点点信息,我在 App2 中以相同的包名称拥有相同的 HzData。我在 JobConfig 中添加了与作业相关的所有类。
- 然后我的管道包含类似于此代码的内容。
BatchSource<Map.Entry<Integer, HzData>> dataBatchSource = Sources.map('hzMap');
BatchStage<HzData> dataBatchStage = pipe.drawFrom(dataBatchSource ).
mapUsingContext(ContextFactories.replicatedMapContext
('hzMap'),
(map, data) -> data.getValue());
dataBatchStage.drainTo(Sinks.logger())
上面的报价可以很好地记录所有数据。如果我使用像下面这样的过滤器,它会给我带来问题
dataBatchStage.filter(v -> v.getCheck() == 0).drainTo(Sinks.logger());
以上导致我出现错误,例如,
com.hazelcast.jet.JetException: Exception in ProcessorTasklet{filter#24}: java.lang.ClassCastException: com.nexus.api.portables.HzData cannot be cast to com.nexus.api.portables.HzData
该错误似乎是一个反序列化错误,但我想知道上面的记录器是如何工作的。
我也尝试过使用filterUsingContext()
但仍然得到相同的结果。
提前致谢。期待您的宝贵意见和解决方案。
hazelcast-jet - Hazelcast Jet - 模式匹配
hazelcast jet 是否支持模式匹配用例,例如很少有偶数流处理器支持。
如果没有,这将包含在未来的版本中。
这是一个用于模式匹配的用例
查找在 10 秒时间范围内卖出“AAA”股票然后买入“BBB”股票的交易者。
spring-boot - 出现异常时重启 Hazelcast Jet (v0.4)
我们正在使用 Hazelcast Jet 0.4 版本从 Kafka Source 读取消息,处理消息并写入 Kafka。由于 Kafka 由外部团队管理,我们无法控制 Kafka 中抛出的各种异常。
例如,我们收到以下异常:由于组已经重新平衡并分配了分区,因此无法完成提交
当我们收到此错误时,Hazelcast Jet 实例已关闭。所以我们的应用程序变得不可用,我们必须重新启动应用程序。
我们正在研究在这些错误期间自动重新启动 Jet 实例的可能性。
谢谢你的帮助!
hazelcast-jet - 将作业提交到远程 hazelcast 集群
我是 Hazelcast Jet 的新手,有一个非常基本的问题。我设置了一个 3 节点 JET 集群。我有一个示例代码可以从 Kafka 读取并导出到 IMap。当我从命令行运行它(使用jet-submit.sh
并使用JetBootstrap.getInstance()
来获取 JET 客户端实例)时,它工作得非常好。当我运行相同的代码(Jet.newJetClient()
用于获取实例并在 Eclipse 上运行 -> Java 应用程序)时,我得到:
你能告诉我我哪里错了吗?
java - 删除 JetInstance 作业
有没有办法,如何从我的 hazelcastInstance 中删除未运行的作业?
我有 stream fromjetInstance.getJobs()
并且我需要 return Map<String, Object>
,其中 key 是工作的名称。但这是不可能的,因为我已经停止并重新运行了一些作业 - 有更多具有相同名称的实例。
除了删除完成的情况外,我无法为我的情况找到任何可能的解决方案。
我不确定,如果它甚至可能。感谢您提供一些解决方案或提示。
node.js - Hazelcast Jet 和 Node.JS 客户端序列化问题
我有 Jet 0.6 作为备份和一些安装了 hazelcast-nodejs-client 0.8.0 的 Node.JS 进程。我正在尝试从 Node 进程中推送一个对象,这正是 Jet 端类似对象的反映。但是我不明白如何确保在 Jet 方面这个 JS 对象将分别被序列化/反序列化。我觉得我需要向 Jet 表明这个 JSON 对象是 Data POJO 并且应该使用正确的序列化/反序列化。
在节点方面:
在 Jet 方面:公共类 Data 实现 Serializable {
更新:
我设法将调用堆栈追踪到一个DefaultSerializer.ts
显然负责将 JS 对象转换为Data
实例以通过MapProxy
服务发送到缓存的实例:
hazelcast-jet - 具有 2 个传入边缘的处理器 - 当在一个边缘返回 false 时,继续从同一边缘重新处理,并且永远不要在另一边缘处理新项目
我要求确认我对 tryProcess() 逻辑的假设。
详细说明返回值(真/假)如何影响处理器上的 DAG 工作流,该处理器具有 2 个未指定优先级的传入边。
我的假设是,如果处理器的两条边都有传入项目,并且一个 tryProcess() 返回 false,则另一边将被处理(如果该边上有更多传入项目可用)。根据哪个边缘停止处理和哪个接受它们来交替传入项目。
问题
有时会发生一个处理器实例阻塞在总是返回 false 的 tryProcess(#0) 上(因为我们希望处理来自其他边缘的新项目)。tryProcess(#0) 被重复调用,而 tryProcess(#1) 从未被调用。我确信无论是#0 还是#1 边缘都不会在处理器上调用completeEdge(),所以我希望从边缘#1 有更多的项目要处理。这通常发生在多次运行同一个 Job 之后。
为了更好地解释这个问题,这是我的用例:
用例
我的数据模型由以下对象组成
- A:由“ida”属性标识的对象
- B:由“idb”属性标识的对象。它使用“ida”值引用 A
- AB:耦合B对象及其引用的A对象的对象
我需要将 B 对象与正确引用的 A 对象匹配并发出其中的几个。
我有一个具有此设置的 DAG:
顶点
- SA:“A”类型的源项目(localParallelism(1),发出按“ida”属性排序的 A 对象)
- SB:类型为“B”的源项目(localParallelism(1),发出按引用的“ida”属性排序的 B 个对象)
- C-AB:将 B 对象与引用的 A 对象匹配的处理器(发出 AB 对象)
连接
- SA -> C-AB : 传入边缘#0(未指定优先级,由“ida”属性分区)
- SB -> C-AB : 输入边#1(未指定优先级,通过引用“ida”属性进行分区)
该环境由具有 2 个节点的 hazelcast 喷气机集群组成。
逻辑
C-AB 处理器获取第一个“A”对象(从边缘#0)并保持它直到与该“A”对象相关的所有“B”对象都被处理。如果它接收到另一个“A”对象,它会在 tryProcess(#0) 中返回 false。
当它接收到与当前“A”匹配的“B”对象(来自边缘#1)时,它会发出“AB”。
当处理器接收到一个带有下一个“A”对象的引用的“B”对象时,它会丢弃当前的“A”并等待下一个。
如果它在获得引用的“A”对象之前接收到“B”对象,如果收到新的“B”,则等待正确的“A”匹配 tryProcess(#1) 中返回的 false。
这应该可行,因为 SA 和 SB 发出正确排序的对象,并且边缘被正确分区以将具有相同“ida”值的对象发送到同一处理器。