0

我正在尝试使用 测试我的流拓扑TopologyTestDriver,下面是 config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); config.put(StreamsConfig.STATE_DIR_CONFIG, "src/test/resources"); config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); 我面临的问题是该测试仅在第二次运行时有效,即如果我从资源中删除 test-app 目录并运行测试,则第一次执行失败,但第二次执行通过。我的拓扑如下

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [tpoic1])
      --> KSTREAM-TRANSFORM-0000000001
    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002 (topic: tpoic2)
      <-- KSTREAM-TRANSFORM-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000003 (topics: [tpoic3])
      --> KSTREAM-FLATMAP-0000000004, KSTREAM-TRANSFORM-0000000017
    Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-TRANSFORM-0000000017 (stores: [])
      --> KSTREAM-SINK-0000000018
      <-- KSTREAM-SOURCE-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: tpoic4)
      <-- KSTREAM-FLATMAP-0000000004
    Sink: KSTREAM-SINK-0000000018 (topic: tpoic5)
      <-- KSTREAM-TRANSFORM-0000000017

  Sub-topology: 2 for global store (will not generate tasks)
    Source: KSTREAM-SOURCE-0000000006 (topics: [tpoic4])
      --> KTABLE-SOURCE-0000000007
    Processor: KTABLE-SOURCE-0000000007 (stores: [global-tpoic4-view])
      --> none
      <-- KSTREAM-SOURCE-0000000006
  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000008 (topics: [tpoic2])
      --> KSTREAM-LEFTJOIN-0000000009
    Processor: KSTREAM-LEFTJOIN-0000000009 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Sink: KSTREAM-SINK-0000000010 (topic: tpoic6)
      <-- KSTREAM-LEFTJOIN-0000000009

  Sub-topology: 4
    Source: KSTREAM-SOURCE-0000000011 (topics: [tpoic6])
      --> KSTREAM-KEY-SELECT-0000000012
    Processor: KSTREAM-KEY-SELECT-0000000012 (stores: [])
      --> grouped-fp-repartition-filter
      <-- KSTREAM-SOURCE-0000000011
    Processor: grouped-fp-repartition-filter (stores: [])
      --> grouped-fp-repartition-sink
      <-- KSTREAM-KEY-SELECT-0000000012
    Sink: grouped-fp-repartition-sink (topic: grouped-fp-repartition)
      <-- grouped-fp-repartition-filter

  Sub-topology: 5
    Source: KSTREAM-SOURCE-0000000019 (topics: [tpoic5])
      --> KSTREAM-JOIN-0000000020
    Processor: KSTREAM-JOIN-0000000020 (stores: [grouped-fp])
      --> KSTREAM-SINK-0000000021
      <-- KSTREAM-SOURCE-0000000019
    Source: grouped-fp-repartition-source (topics: [grouped-fp-repartition])
      --> KSTREAM-AGGREGATE-0000000013
    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [grouped-fp])
      --> none
      <-- grouped-fp-repartition-source
    Sink: KSTREAM-SINK-0000000021 (topic: upd-topic)
      <-- KSTREAM-JOIN-0000000020

  Sub-topology: 6
    Source: KSTREAM-SOURCE-0000000022 (topics: [upd-topic])
      --> KSTREAM-FOREACH-0000000023
    Processor: KSTREAM-FOREACH-0000000023 (stores: [])
      --> none
      <-- KSTREAM-SOURCE-0000000022
4

0 回答 0