问题标签 [hazelcast-jet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
131 浏览

java - 故障后如何正确重新定义/重新启动 Jet 的工作?

Jet 的手册说,一份工作可以提交多次。但是,当我在一些异常后尝试重新启动它时,我会得到像“无法重新启动失败的作业”这样的异常。如果我什么都不做,我不会有任何异常,但计算不会在管道内执行。

提前致谢!

0 投票
1 回答
139 浏览

hazelcast-jet - 如何对异步接收数据的 hazelcast jet 源进行编程

我一直在浏览 hazelcast-jet 文档,以查找对某些外部进程异步馈送的源的引用——在我的例子中是 http 帖子。

我确实查看了Kafka 代码,因为这似乎是最接近的,但无法弄清楚新到达的事件将如何触发任何事情。我假设,这里不会涉及阻塞线程。

我将不胜感激任何指针,以更好地了解如何在“流”元素被滴灌的环境中使用 hazelcast jet。

0 投票
2 回答
48 浏览

hazelcast-jet - DAG 源在 emitFromTraverser 上返回 false 并且处理器在开始处理之前等待源加载的所有元素

用例

HazelcastJet 版本 0.6.1 Hazelcast 版本 3.10.2

鉴于 DAG 的这个(简化版)

顶点

S1 发出 5 个类型 A 项目的源(从带分区的数据库中读取)本地并行度 = 1

发出 150K 类型 B 项目的S2源(从 DB 中批量读取 100 个分区的迭代器) 本地并行度 = 1

AD 处理器,适配 A->A1 和 B->B1 类型并一一发出

FA Processors.filterP 仅接受 A1 类型的项目并一一发出

FB Processors.filterP 仅接受 B1 类型的项目并一一发出

CL 处理器首先累积所有 A1 类型的项目,然后当它收到 B1 类型的项目时,用从适当的 A1 获得的一些人员来丰富它,然后一个接一个地发出。

写入 B1 的WR Sink 本地并行度 = 1

注意: 只是为了给过滤器处理器赋予意义:在 DAG 中有其他源流入同一个适配器 AD,然后使用过滤器处理器进入其他路径。

边缘

S1 --> 广告

S2 --> 广告

AD --> FA (从序数 0)

AD --> FB (从序数 1)

FA --> CL(到序号 0,优先级 0 分发和广播)

FB --> CL(到序号 1,优先级为 1)

CL --> 写

问题

如果源 S2 有“少量”要加载的项目(即 15K),则 emitFromTraverser 永远不会返回 false。

如果源 S2 有“许多”项要加载(即 150K),则 emitFromTraverser 在以下情况下返回 false:

  • 所有 A1 项目已由 CL 处理
  • 大约 30% 的 B1 项目已经传输到 CL,但没有一个被 CL 处理(DiagnosticProcessor 记录该元素已发送到 CL 但未处理)

S2代码供参考:

问题

  • CL 在源代码结束之前不处理项目是否正确?
  • 在 CL Vertex 上使用优先级 + 分布式 + 广播是否正确?

更新

似乎从未调用过 CL 边缘 1 上的 completeEdge。有人可以告诉我为什么吗?

谢谢!

0 投票
0 回答
346 浏览

java - 当我使用 Hazelcast Jet 时,Java 堆大小会永久增长

我的流量不是很大。我通过 Jet 管道每 2 秒传输大约 400 KB 的数据。在此处输入图像描述

我正在使用缓冲区映射通过以下配置通过事件日志启动流:

管道开始于:

缓冲区映射中新值的设置由 set() 方法执行,并替换旧值。

运行几个小时后,我的应用程序达到 xmlx 值,Hazelcast 将关闭。

在此处输入图像描述

您能否检查我是否有错误的配置,或者我应该更详细地分析我的应用程序?我目前的假设是,尽管设置了容量和生存时间,但事件的日志没有被清理。

提前致谢!

UPD:添加了堆转储和图表

在此处输入图像描述

在此处输入图像描述

0 投票
2 回答
204 浏览

hazelcast - getMap() + 同步地图的后续更改

我有一个启用了日志的 IMap。

使用客户端(Hazelcast 或 Jet),我想获得完整的地图,并获得所有后续更新以丰富地图。

我怎么能做到这一点?

如果执行 .getMap(),然后调用 getJournalMap() 或 .addEntryListener(),我担心在 getMap() 和 addEntryListener() 调用之间可能会丢失更新。

有没有更直观的方式来获得完整的地图+更新?谢谢

0 投票
1 回答
96 浏览

hazelcast - Hazelcast Jet 内部优化

我检查了 Hazelcast Jet 以满足我的项目需求,但我发现文档在以下主题方面确实含糊不清:

1)当我对两个列表流执行数据连接时......例如:

那么我能以某种方式告诉 Jet 下面实际会发生什么连接吗?地图侧连接或减少侧连接......?我的意思是想象一下“经纪人”规模很小,而交易规模很大。执行这两组连接的最佳技术是地图侧连接,也就是广播连接......当 Jet 进行连接时,哪些数据将通过网络传输?是否有任何基于大小的优化?

2)我正在测试以下场景:

简单的管道:

list("master")不断被集群中的另一个节点填充。现在,当我将此管道提交到集群时,只有列表的子集(“master”)被排空到记录器。我可以以某种方式将 Jet 作业设置为不断消耗list("master")标准输出吗?

提前致谢

0 投票
1 回答
87 浏览

hazelcast-jet - 如何在不需要 DAG 的情况下在 Hazelcast Jet 中使用 Twitter 流源?

我想对推文的实时流进行简单分析。

如何在不需要 DAG 的情况下在 Hazelcast Jet 中使用 Twitter 流源?

细节

Twitter API 的封装在StreamTwitterP.java中做得很好。

但是,调用者将其用作 DAG 的一部分,c/o:

我的用例不需要 DAG 的强大功能,所以我宁愿避免这种不必要的额外复杂性。

为了避免 DAG,我希望使用 SourceBuilder 为实时推文流定义一个新的数据源。

我假设它的代码类似于上面提到的 StreamTwitterP.java,但是我不清楚使用 Hazelcast JET 的 API 是否合适。

我指的是文档中的SourceBuilder 示例

0 投票
2 回答
72 浏览

hazelcast-jet - 来自不同来源的多个流之间的完全连接

我正在使用 hazelcast jet 0.6.1 进行实时分析。有来自不同来源的多个流(主要来自远程日志)。

我想知道,多个流之间是否支持完全连接。

如果是,请您指出一些链接/示例,以便在多个流之间进行完全连接。

0 投票
1 回答
113 浏览

hazelcast-jet - 如何确保按顺序处理具有相同键的元组

我已经使用 Hazelcast Jet 将 IoT 测量流转换为警报流。

因此,只要一个传感器的湿度水平超过阈值,就会发出警报。当它再次低于阈值时,警报被清除。最多可以有 3 个级别的阈值(严重性)。

目前,我在工作开始时遇到问题。它将刷新我的 RabbitMQ 源中的所有缓冲事件。因此,far 事件是有序的,因为本地并行性是一个(让我们在这里假设一个成员集群)。但是我们将事件分派到协作线程池中,订单上没有保证。我可以指示 Jet 按顺序处理具有相同传感器 ID 的所有事件吗?

这是我的管道的当前定义:

checkNotification 将事件的严重性与此传感器的最新严重性进行比较。这就是为什么顺序很重要。


我尝试实施 Gokhan Oner 建议的解决方案:我修改了源以输出 SimpleMeasurement 对象。这样我就可以在源之后添加时间戳。

使用此代码,对于相同的传感器 ID,事件仍未按顺序进行处理。此外,从源读取事件到在“checkNotification”中处理它有 20 秒的延迟。

0 投票
1 回答
117 浏览

hazelcast-jet - Is it possible that Sources.mapJournal() is slow for frequently updating of IMap?

I'm trying to emulate the stream drawing from Sources.mapJournal through IMap which receives data from IoT device. The processing of this stream is too slow and I'm getting the big accumulated outcome after 30-60 seconds.

When I started to update the IMap frequently with small data (12 KB per value), the exception is:

com.hazelcast.ringbuffer.StaleSequenceException: sequence:123 is too small and data store is disabled.

I increased the default capacity of IMap journal 10 times. It became stable after that, but very slow. A similar issue is when I'm updating the IMap with big values (about 1.2 MB per 5 seconds). Additionally I have several connected IoT devices and each of them has its own Jet job with the same pipeline:

Thanks in advance!

UPD:

  • The journal size is EventJournalConfig.DEFAULT_CAPACITY * 10 = 100 000 (1 partition)
  • Jet version is 0.7.2
  • Serialazable classes implements com.hazelcast.nio.serialization.IdentifiedDataSerializable