问题标签 [hazelcast-jet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
212 浏览

java - 是否可以嵌套 Hazelcast JET 管道,以便内部管道可以计算外部管道的结果?

考虑以下场景:

我们想要获取一个大型分布式对象集合,并且对于集合中的每个对象,我们想要启动另一个计算,该计算使用当前对象和另一个大型分布式集合来计算转换当前对象的结果。

例如

集合A:1、2、3、4、5、6、7、8……

集合 B:1,2,3,4,5,6,7,8……

对于 A 中的每个值,我们迭代 B 中的所有值,将每个值乘以 2 并对这些值求和,我们将 A 中的每个值映射到该总和乘以当前 A 值。

以下是我的尝试,当使用以下内容时会导致死锁:

使用以下内容时不会出现死锁:

,但是我们希望 p2 完成以确保我们得到正确的总和。

对于这个特定的用例,这似乎是一种使用 Jet 的非惯用方式,但是我想使用这种模式来解决其他问题,因此非常感谢您的帮助。

0 投票
1 回答
120 浏览

hazelcast-jet - Hazelcast-jet 的最大聚合

我想对整个数据集做一个简单的最大值。我从 Kafka 示例开始:https ://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-maintenance/kafka/src/main/java/avro/KafkaAvroSource.java

我刚刚将管道更改为:

然后去:

获得话题中年龄最大的人。然而,它不返回 20 并且似乎在不同的运行中返回不同的值。知道为什么吗?

0 投票
0 回答
177 浏览

hazelcast-jet - 使用 Hazelcast-jet 将大型 Kafka 主题读入地图

我有一个 Kafka 主题,从大约 100GB 开始,我尝试使用 Hazelcast-jet 将其读入 IMap。这台机器有足够的内存,我给了它 300 GB 的堆。该主题被划分为 147 个分区,但是当我运行代码告诉管道以“最早”从主题中读取本地并行度设置为 84 时,该进程似乎没有使用很多内核,并且运行一段时间后不会地图中的条目数量应该接近于地图中的条目数量(与同时摄取到 Elastic 搜索中的数据相比)。现在该主题已经超过 500GB,我预计该进程最终会耗尽内存,但它似乎仍然没有使用很多内核并且只加载一小部分数据。

有谁知道为什么会这样?

0 投票
1 回答
243 浏览

hazelcast - Hazelcast Jet 卡在启动 Job 上

我在 Hazelcast Jet 中遇到了奇怪的行为。我同时开始了许多工作(约 30 个,有些是在其他之前触发的)。但是,当我的 Hazelcast Jet 作业计数达到 26(幻数?)时,所有处理都卡住了。

在线程中,我看到以下信息:

并且:

我没有任何其他信息如何重现这个问题,但是我希望有人知道如何解决这个问题,或者我的问题会帮助其他人:)

我的设置: - Java 11 - Hazelcast 3.12 快照 - Hazelcast Jet 3.0 快照(我无法恢复到以前的版本,它会破坏我的逻辑;我需要将在 3.1 中添加的 n:m 连接)- CPU 内核:4 - RAM:7 GB - Jet 模式:服务器,作为客户端连接到其他集群以插入最终数据。

有没有人遇到过类似的问题?问题是,它不能简单地复制,因此很难为 Hazelcast 团队制造问题。只有线程转储和一般行为可以提示正在发生的事情。

0 投票
1 回答
346 浏览

hazelcast-imap - 获取和异常NotSerializableException:com.hazelcast.map.impl.proxy.MapProxyImpl

我是 hazelcast-jet 的新手,我的用例是在检查 hazelcastIMDG 中的值后从 Kafka 源读取和过滤。

我什至在创建管道之前就正在获取并加载 IMDG 地图。见下文

在 buildPipeline 方法中将 policyMap 作为参数传递。

我创建了如下管道

但有了这个我得到了例外

线程“main”java.lang.IllegalArgumentException 中的异常:“filterFn”必须在 com.hazelcast.jet.impl.pipeline 的 com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:301) 处可序列化。 ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:129) 在 com.hazelcast.jet.impl.pipeline.StreamStageImpl.filter(StreamStageImpl.java:71) 在 com.visa.rls.handler.HazelcastJetIngetstResultHandler.buildPipeline(HazelcastJetIngetstResultHandler.java:120)在 com.visa.rls.handler.HazelcastJetIngetstResultHandler.run(HazelcastJetIngetstResultHandler.java:84) 在 com.visa.rls.handler.HazelcastJetIngetstResultHandler.main(HazelcastJetIngetstResultHandler.java:58) 引起:java.io.NotSerializableException: com.hazelcast .map.impl.proxy.MapProxyImpl 在 java.io.ObjectOutputStream。writeObject0(ObjectOutputStream.java:1184) 在 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java :1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java .io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 5 更多writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java :1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.hazelcast.jet.impl.util.Util.checkSerializable(Util. java:299) ... 还有 5 个writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java :1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.hazelcast.jet.impl.util.Util.checkSerializable(Util. java:299) ... 还有 5 个writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 还有 5 个writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 还有 5 个

0 投票
1 回答
129 浏览

hazelcast - Hazelcast Jet 是否支持滚动数字作为 IMap 键,以 Kafka 作为源?

我前段时间使用过 Hazelcast,而且我是第一次使用 Hazelcast Jet,对于处理一些实时流式传输,进行探索似乎很有趣。

在这里,我有一种情况,我正在Kafka topic使用IMap

好吧,我没有这个话题的关键。我应该可以选择将滚动号码分配为密钥吗?如果是这样,请帮助我使用该技术。谢谢。

0 投票
2 回答
160 浏览

java - Hazelcast Jet Pipeline 中“withIngestionTimestamps()”的确切用途是什么?

我正在运行一个管道,从Kafka主题源和接收器到IMap. 每次我写一个,我都会遇到这些方法withIngestionTimestamps()withoutTimestamps()想知道它们有什么用?我了解它的所有关于为事件添加时间的来源。问题是我如何使用它?我没有看到任何从事件中获取时间戳的方法?

我的 IMap 有可能被重复值填充。如果我可以使用 withIngestionTimestamps() 方法来评估最新记录并丢弃旧记录?

0 投票
0 回答
223 浏览

java - Hazelcast-Jet 客户端部署抛出 java.lang.NoClassDefFoundError(org/apache/kafka/clients/consumer/ConsumerRecord)

我已经启动了 Hazelcast-Jet(jet-start.sh) 的 3 个实例。可以看到集群形成为 3 个节点。

在其中一个集群节点中,我尝试pipeline使用JetClientto Source from KafkatoIMap和 source from IMapto运行很少的Kafka。这是代码:

虽然代码中没有,但我正在使用org/apache/kafka/clients/consumer/ConsumerRecord,在运行这个 jar 时,我收到了这个错误:

虽然我添加了 的依赖项kafka-clients,但仍然遇到此错误。这是 pom.xml:

也添加kafka-clients.jar/Hazelcast-jet-3.0/lib文件夹中。想知道我是否应该开始Hazelcast IMDG

有人请帮助和纠正。谢谢。

0 投票
1 回答
152 浏览

java - Hazelcast-Jet 与 Kafka 源和接收器管道的稳定性如何?

我正在运行 Hazelcast-Jet 的 3 节点嵌入式集群,控制台中经常出现以下错误。可能的原因是什么?

有人可以帮我理解吗?

来自 Kafka 源和接收器的数据也不一致。

0 投票
1 回答
236 浏览

hazelcast-jet - Hazelcast Jet - 按用例分组

我们需要在庞大的数据集上以动态方式按多个字段进行分组。数据存储在 Hazelcast Jet 集群中。示例:如果Person类包含 4 个字段:agename和。我们首先需要按城市分组,然后按国家分组,然后我们可以根据条件参数按名称分组。citycountry

我们已经尝试过使用分布式收集但无法正常工作。即使我们尝试使用 Pipeline API,它也会抛出错误。

代码:

然后我们在客户端读取地图并销毁它。

服务器上的错误消息:

原因:java.lang.ClassCastException:java.util.HashMap 无法转换为 java.util.Map$Entry