我有构建管道的示例代码。
private Pipeline buildPipeline() {
logger.debug("AbstractAuditLogProcessor.buildPipeline method start");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getPlatformClientConfig(), START_FROM_OLDEST))
.addTimestamps((v) -> getTimeStamp(v), 3000)
.peek()
.groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
.window(WindowDefinition.sliding(getSlidingWindowLengthInMillies(), getSlidingStepInMillies()))
.aggregate(counting())
.map((v)-> getMapKey(v))
//.<Map.Entry<String, Long>>customTransform("test2", ()-> this)
//.<Offer>customTransform("Offer_Recommendations", ()-> this)
.<Map.Entry<String, Offer>>customTransform("Offer_Recommendations", ()-> this)
//.drainTo(Sinks.remoteList("cache_OfferRecommendations", getPlatformClientConfig()));
.drainTo(Sinks.remoteMap("cache_OfferRecommendations", getPlatformClientConfig()));
logger.debug("AbstractAuditLogProcessor.buildPipeline method end");
return p;
}
此代码打印以下 DAG 信息
dag
.vertex("remoteMapJournalSource(cache_AuditLog)").localParallelism(1)
.vertex("sliding-window-step1").localParallelism(4)
.vertex("sliding-window-step2").localParallelism(4)
.vertex("map").localParallelism(4)
.vertex("Offer_Recommendations").localParallelism(4)
.vertex("remoteMapSink(cache_OfferRecommendations)").localParallelism(1)
.edge(between("remoteMapJournalSource(cache_AuditLog)", "sliding-window-step1").partitioned(?))
.edge(between("sliding-window-step1", "sliding-window-step2").partitioned(?).distributed())
.edge(between("sliding-window-step2", "map"))
.edge(between("map", "Offer_Recommendations"))
.edge(between("Offer_Recommendations", "remoteMapSink(cache_OfferRecommendations)"))
DAG 信息具有其他详细信息/方法调用,例如 partitioned() 、 Distributed()
这是否根据密钥分配记录?同样,hazelcast jet 如何确保记录不会移动到不同的分区。