我正在尝试将我几周前编写的 hazelcast Jet 0.3 DAG 系统重新设计为 v0.4,作为将其从批处理更改为流的第一步。有趣的是,突然间我遇到了一些奇怪的行为,我无法确定顶点是否按预期工作。试图了解正在发生的事情,我找不到如何窥视每个顶点内部工作的选项。有没有办法至少从中获取一些错误消息?
为了隔离问题,我试图将其简化为一个非常简单的“从列表读取,将其映射到映射写入映射”DAG。但是仍然没有成功。
在我愚蠢的例子下面,也许我犯了一个非常简单的错误,更有知识的人会马上看到?
出版商:
// every second via executorservice:
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data");
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray);
分析仪:
jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key));
jet.getMap("data").remove(key);
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size());
final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1);
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
(byte[] e) -> new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e)));
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result"));
this.edge(between(sourceDataMap, parseByteArrayToMap))
.edge(between(parseByteArrayToMap, sinkIntoResultMap));
听众:
jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>)
(EntryEvent<Long, byte[]> entryEvent)
-> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis())
, true);
数据生成工作正常,一切正常,直到 DAG 接管......但没有错误消息或来自 DAG 的任何内容。有什么建议么?