我正在尝试使用 测试我的流拓扑TopologyTestDriver,下面是
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); config.put(StreamsConfig.STATE_DIR_CONFIG, "src/test/resources"); config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
我面临的问题是该测试仅在第二次运行时有效,即如果我从资源中删除 test-app 目录并运行测试,则第一次执行失败,但第二次执行通过。我的拓扑如下
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [tpoic1])
--> KSTREAM-TRANSFORM-0000000001
Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002 (topic: tpoic2)
<-- KSTREAM-TRANSFORM-0000000001
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000003 (topics: [tpoic3])
--> KSTREAM-FLATMAP-0000000004, KSTREAM-TRANSFORM-0000000017
Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-TRANSFORM-0000000017 (stores: [])
--> KSTREAM-SINK-0000000018
<-- KSTREAM-SOURCE-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: tpoic4)
<-- KSTREAM-FLATMAP-0000000004
Sink: KSTREAM-SINK-0000000018 (topic: tpoic5)
<-- KSTREAM-TRANSFORM-0000000017
Sub-topology: 2 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000006 (topics: [tpoic4])
--> KTABLE-SOURCE-0000000007
Processor: KTABLE-SOURCE-0000000007 (stores: [global-tpoic4-view])
--> none
<-- KSTREAM-SOURCE-0000000006
Sub-topology: 3
Source: KSTREAM-SOURCE-0000000008 (topics: [tpoic2])
--> KSTREAM-LEFTJOIN-0000000009
Processor: KSTREAM-LEFTJOIN-0000000009 (stores: [])
--> KSTREAM-SINK-0000000010
<-- KSTREAM-SOURCE-0000000008
Sink: KSTREAM-SINK-0000000010 (topic: tpoic6)
<-- KSTREAM-LEFTJOIN-0000000009
Sub-topology: 4
Source: KSTREAM-SOURCE-0000000011 (topics: [tpoic6])
--> KSTREAM-KEY-SELECT-0000000012
Processor: KSTREAM-KEY-SELECT-0000000012 (stores: [])
--> grouped-fp-repartition-filter
<-- KSTREAM-SOURCE-0000000011
Processor: grouped-fp-repartition-filter (stores: [])
--> grouped-fp-repartition-sink
<-- KSTREAM-KEY-SELECT-0000000012
Sink: grouped-fp-repartition-sink (topic: grouped-fp-repartition)
<-- grouped-fp-repartition-filter
Sub-topology: 5
Source: KSTREAM-SOURCE-0000000019 (topics: [tpoic5])
--> KSTREAM-JOIN-0000000020
Processor: KSTREAM-JOIN-0000000020 (stores: [grouped-fp])
--> KSTREAM-SINK-0000000021
<-- KSTREAM-SOURCE-0000000019
Source: grouped-fp-repartition-source (topics: [grouped-fp-repartition])
--> KSTREAM-AGGREGATE-0000000013
Processor: KSTREAM-AGGREGATE-0000000013 (stores: [grouped-fp])
--> none
<-- grouped-fp-repartition-source
Sink: KSTREAM-SINK-0000000021 (topic: upd-topic)
<-- KSTREAM-JOIN-0000000020
Sub-topology: 6
Source: KSTREAM-SOURCE-0000000022 (topics: [upd-topic])
--> KSTREAM-FOREACH-0000000023
Processor: KSTREAM-FOREACH-0000000023 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000022