问题标签 [hazelcast-jet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
168 浏览

hazelcast-jet - Hazelcast Jet 0.6.1 + Pipeline API + 定制处理器

我正在尝试将自定义处理器附加到 Hazelcast Jet Pipeline 定义。

这是示例代码。

这是 tryProcess() 方法的示例代码

在执行时,我得到了以下异常

有什么遗漏吗?你能帮我解决这个问题吗?

0 投票
1 回答
63 浏览

hazelcast-jet - Hazelcast Jet 0.6.1 DAG 与 PipeLine API

我为 DAG 创建了以下示例代码以了解聚合。看来,slidingWindow顶点不发出任何记录。

不确定,这里出了什么问题..

同样,我为 Pipeline API 创建了以下示例代码以进行聚合。

这很好用。这将打印文本文件中的记录。

请您帮我更正 DAG 定义吗?

0 投票
1 回答
60 浏览

hazelcast-jet - Hazelcast Jet 0.6.1 - Pipeline customTransform API 的编译错误

我收到 Pipeline customTransformAPI 的以下编译错误。

这是构建管道的示例代码:

这是tryProcess()方法的示例代码:

这是编译错误。

这可以使用以下代码编译并运行良好。

但是,下面的代码给出了编译错误。

你能帮我解决这个问题吗?

0 投票
0 回答
108 浏览

java - 如何实现 mapUsingContext() / filterUsingContext() - hazelcast-jet

我有以下场景来理解我的问题。

  • 有一个外部节点(节点 A)、一个客户端应用程序(App1)和一个 Hazelcast-Jet 作业应用程序(节点 B,App2)。
  • App1 从 FlatBuffers 收集数据并将其包装在一个 Object HzData 中。HzData 是 DataSerializable 实现的。
  • 然后将 HzData 放入 IMAP 作为 IMAP,称为 hzMap。
  • 首先,我启动节点 A,然后启动 App1。App1 将 hzMap 存入节点 A。
  • 然后,我运行 App2,它依次启动节点 B 并运行 Jet 作业。
  • 关于 App2 的一点点信息,我在 App2 中以相同的包名称拥有相同的 HzData。我在 JobConfig 中添加了与作业相关的所有类。
  • 然后我的管道包含类似于此代码的内容。

BatchSource<Map.Entry<Integer, HzData>> dataBatchSource = Sources.map('hzMap'); BatchStage<HzData> dataBatchStage = pipe.drawFrom(dataBatchSource ). mapUsingContext(ContextFactories.replicatedMapContext ('hzMap'), (map, data) -> data.getValue()); dataBatchStage.drainTo(Sinks.logger()) 上面的报价可以很好地记录所有数据。如果我使用像下面这样的过滤器,它会给我带来问题

dataBatchStage.filter(v -> v.getCheck() == 0).drainTo(Sinks.logger());

以上导致我出现错误,例如,

com.hazelcast.jet.JetException: Exception in ProcessorTasklet{filter#24}: java.lang.ClassCastException: com.nexus.api.portables.HzData cannot be cast to com.nexus.api.portables.HzData

该错误似乎是一个反序列化错误,但我想知道上面的记录器是如何工作的。

我也尝试过使用filterUsingContext()但仍然得到相同的结果。

提前致谢。期待您的宝贵意见和解决方案。

0 投票
0 回答
59 浏览

hazelcast-jet - Hazelcast Jet - 模式匹配

hazelcast jet 是否支持模式匹配用例,例如很少有偶数流处理器支持。

如果没有,这将包含在未来的版本中。

这是一个用于模式匹配的用例

查找在 10 秒时间范围内卖出“AAA”股票然后买入“BBB”股票的交易者。

0 投票
1 回答
61 浏览

spring-boot - 出现异常时重启 Hazelcast Jet (v0.4)

我们正在使用 Hazelcast Jet 0.4 版本从 Kafka Source 读取消息,处理消息并写入 Kafka。由于 Kafka 由外部团队管理,我们无法控制 Kafka 中抛出的各种异常。

例如,我们收到以下异常:由于组已经重新平衡并分配了分区,因此无法完成提交

当我们收到此错误时,Hazelcast Jet 实例已关闭。所以我们的应用程序变得不可用,我们必须重新启动应用程序。

我们正在研究在这些错误期间自动重新启动 Jet 实例的可能性。

谢谢你的帮助!

0 投票
2 回答
372 浏览

hazelcast-jet - 将作业提交到远程 hazelcast 集群

我是 Hazelcast Jet 的新手,有一个非常基本的问题。我设置了一个 3 节点 JET 集群。我有一个示例代码可以从 Kafka 读取并导出到 IMap。当我从命令行运行它(使用jet-submit.sh并使用JetBootstrap.getInstance()来获取 JET 客户端实例)时,它工作得非常好。当我运行相同的代码(Jet.newJetClient()用于获取实例并在 Eclipse 上运行 -> Java 应用程序)时,我得到:

你能告诉我我哪里错了吗?

0 投票
1 回答
47 浏览

java - 删除 JetInstance 作业

有没有办法,如何从我的 hazelcastInstance 中删除未运行的作业?

我有 stream fromjetInstance.getJobs()并且我需要 return Map<String, Object>,其中 key 是工作的名称。但这是不可能的,因为我已经停止并重新运行了一些作业 - 有更多具有相同名称的实例。

除了删除完成的情况外,我无法为我的情况找到任何可能的解决方案。

我不确定,如果它甚至可能。感谢您提供一些解决方案或提示。

0 投票
1 回答
266 浏览

node.js - Hazelcast Jet 和 Node.JS 客户端序列化问题

我有 Jet 0.6 作为备份和一些安装了 hazelcast-nodejs-client 0.8.0 的 Node.JS 进程。我正在尝试从 Node 进程中推送一个对象,这正是 Jet 端类似对象的反映。但是我不明白如何确保在 Jet 方面这个 JS 对象将分别被序列化/反序列化。我觉得我需要向 Jet 表明这个 JSON 对象是 Data POJO 并且应该使用正确的序列化/反序列化。

在节点方面:

在 Jet 方面:公共类 Data 实现 Serializable {

更新:

我设法将调用堆栈追踪到一个DefaultSerializer.ts显然负责将 JS 对象转换为Data实例以通过MapProxy服务发送到缓存的实例:

0 投票
1 回答
55 浏览

hazelcast-jet - 具有 2 个传入边缘的处理器 - 当在一个边缘返回 false 时,继续从同一边缘重新处理,并且永远不要在另一边缘处理新项目

我要求确认我对 tryProcess() 逻辑的假设。

详细说明返回值(真/假)如何影响处理器上的 DAG 工作流,该处理器具有 2 个未指定优先级的传入边。

我的假设是,如果处理器的两条边都有传入项目,并且一个 tryProcess() 返回 false,则另一边将被处理(如果该边上有更多传入项目可用)。根据哪个边缘停止处理和哪个接受它们来交替传入项目。

问题

有时会发生一个处理器实例阻塞在总是返回 false 的 tryProcess(#0) 上(因为我们希望处理来自其他边缘的新项目)。tryProcess(#0) 被重复调用,而 tryProcess(#1) 从未被调用。我确信无论是#0 还是#1 边缘都不会在处理器上调用completeEdge(),所以我希望从边缘#1 有更多的项目要处理。这通常发生在多次运行同一个 Job 之后。

为了更好地解释这个问题,这是我的用例:

用例

我的数据模型由以下对象组成

  • A:由“ida”属性标识的对象
  • B:由“idb”属性标识的对象。它使用“ida”值引用 A
  • AB:耦合B对象及其引用的A对象的对象

我需要将 B 对象与正确引用的 A 对象匹配并发出其中的几个。

我有一个具有此设置的 DAG:

顶点

  • SA:“A”类型的源项目(localParallelism(1),发出按“ida”属性排序的 A 对象)
  • SB:类型为“B”的源项目(localParallelism(1),发出按引用的“ida”属性排序的 B 个对象)
  • C-AB:将 B 对象与引用的 A 对象匹配的处理器(发出 AB 对象)

连接

  • SA -> C-AB : 传入边缘#0(未指定优先级,由“ida”属性分区)
  • SB -> C-AB : 输入边#1(未指定优先级,通过引用“ida”属性进行分区)

该环境由具有 2 个节点的 hazelcast 喷气机集群组成。

逻辑

C-AB 处理器获取第一个“A”对象(从边缘#0)并保持它直到与该“A”对象相关的所有“B”对象都被处理。如果它接收到另一个“A”对象,它会在 tryProcess(#0) 中返回 false。

当它接收到与当前“A”匹配的“B”对象(来自边缘#1)时,它会发出“AB”。

当处理器接收到一个带有下一个“A”对象的引用的“B”对象时,它会丢弃当前的“A”并等待下一个。

如果它在获得引用的“A”对象之前接收到“B”对象,如果收到新的“B”,则等待正确的“A”匹配 tryProcess(#1) 中返回的 false。

这应该可行,因为 SA 和 SB 发出正确排序的对象,并且边缘被正确分区以将具有相同“ida”值的对象发送到同一处理器。