Hazelcast Jet 在控制台上打印 DAG 定义,一旦启动
这会将管道定义转换为 DAG。
这是一个管道定义。
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, Record>remoteMapJournal("record", getClientConfig(), START_FROM_OLDEST))
.addTimestamps((v) -> getTimeStamp(v), 3000)
.peek()
.groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting())
.map((v)-> getMapKey(v))
.drainTo(Sinks.remoteMap("Test", getClientConfig()));
return p;
}
这是打印在控制台上的 DAG 定义。
.vertex("remoteMapJournalSource(record)").localParallelism(1)
.vertex("sliding-window-step1").localParallelism(4)
.vertex("sliding-window-step2").localParallelism(4)
.vertex("map").localParallelism(4)
.vertex("remoteMapSink(Test)").localParallelism(1)
.edge(between("remoteMapJournalSource(record)", "sliding-window-step1").partitioned(?))
.edge(between("sliding-window-step1", "sliding-window-step2").partitioned(?).distributed())
.edge(between("sliding-window-step2", "map"))
.edge(between("map", "remoteMapSink(Test)"))
有没有办法获得包含所有细节的 DAG 定义,如滑动窗口细节、聚合 API 等?