0

我有一个 Kafka 流应用程序,我在其中读取一个主题,进行聚合并在 KTable 中实现。然后我创建一个 Stream 并在流上运行一些逻辑。现在在流处理中,我想使用前面提到的 KTable 中的一些数据。启动流应用程序后,如何再次访问 KTable 流?我不想将 KTable 推送到新主题。

KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
            source.groupBy((key, value) -> value.getKey(),
                    Grouped.<String, MyClass >as("repartition-1")
                            .withKeySerde(new Serdes.String())
                            .withValueSerde(new MyClassSerDes()))
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
                            .withKeySerde(new Serdes.String())
                            .withValueSerde(Serdes.Long()));

在这里,我想使用 kTable 中的数据。

inputstream.groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .count(Materialized.<myKey, Long, WindowStore<Bytes,   byte[]>>as("str")
    .withRetention(Duration.ofMinutes(30)))
    .toStream()
    .filter((k, v) -> { 
        // Here get the count for the previous Window.
        // Use that count for some computation here.
    }
4

1 回答 1

1

您可以将KTable商店添加到处理器/变压器。对于您的情况,您可以替换filterwith flatTransform(或任何类似的兄弟transform等,具体取决于您是否需要访问密钥)并将商店连接到运营商:

inputstream.groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
        .withRetention(Duration.ofMinutes(30))
    )
    .toStream()
    // requires v2.2; otherwise use `transform()`
    // if you don't need access to the key, consider to use `flatTransformValues` (v2.3)
    .flatTransform(
        () -> new Transformer<Windowed<myKey>,
                              Long,
                              List<KeyValue<Windowed<myKey>, Long>>() {

            private ReadOnlyWindowStore<myKey, Long> store;

            public void init(final ProcessorContext context) {
                // get a handle on the store by its name
                // as specified via `Materialized` above;
                // should be read-only
                store = (ReadOnlyWindowStore<myKey, Long>)context.getStateStore("str");
            }

            public List<KeyValue<Windowed<myKey>, Long>> transform(Windowed<myKey> key,
                                                                   Long value) {

              // access `store` as you wish to make a filtering decision

              if ( ... ) {
                  // record passes
                  return Collection.singletonList(KeyValue.pair(key, value));
              } else {
                  // drop record
                  return Collection.emptyList();
              }
            }

            public void close() {} // nothing to do
        },
        "str" // connect the KTable store to the transformer using its name
              // as specified via `Materialized` above
    );
于 2020-07-14T06:34:48.987 回答