1

我有构建管道的示例代码。

    private Pipeline buildPipeline() {

    logger.debug("AbstractAuditLogProcessor.buildPipeline  method start");
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getPlatformClientConfig(), START_FROM_OLDEST))
    .addTimestamps((v) ->  getTimeStamp(v), 3000)
    .peek()
    .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(getSlidingWindowLengthInMillies(), getSlidingStepInMillies()))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    //.<Map.Entry<String, Long>>customTransform("test2", ()-> this)
    //.<Offer>customTransform("Offer_Recommendations", ()-> this)
    .<Map.Entry<String, Offer>>customTransform("Offer_Recommendations", ()-> this)
    //.drainTo(Sinks.remoteList("cache_OfferRecommendations", getPlatformClientConfig()));
    .drainTo(Sinks.remoteMap("cache_OfferRecommendations", getPlatformClientConfig()));
    logger.debug("AbstractAuditLogProcessor.buildPipeline  method end");
    return p;
}

此代码打印以下 DAG 信息

dag
.vertex("remoteMapJournalSource(cache_AuditLog)").localParallelism(1)
.vertex("sliding-window-step1").localParallelism(4)
.vertex("sliding-window-step2").localParallelism(4)
.vertex("map").localParallelism(4)
.vertex("Offer_Recommendations").localParallelism(4)
.vertex("remoteMapSink(cache_OfferRecommendations)").localParallelism(1)
.edge(between("remoteMapJournalSource(cache_AuditLog)", "sliding-window-step1").partitioned(?))
.edge(between("sliding-window-step1", "sliding-window-step2").partitioned(?).distributed())
.edge(between("sliding-window-step2", "map"))
.edge(between("map", "Offer_Recommendations"))
.edge(between("Offer_Recommendations", "remoteMapSink(cache_OfferRecommendations)"))

DAG 信息具有其他详细信息/方法调用,例如 partitioned() 、 Distributed()

这是否根据密钥分配记录?同样,hazelcast jet 如何确保记录不会移动到不同的分区。

4

0 回答 0