我正在使用 Kafka 流处理来使用 Springboot 聚合来自源对象的数据。
@Bean
public java.util.function.Consumer<KStream<String, SourceObject>> processSourceObject() {
Serde<SourceObject> SourceObjectSerde = new JsonSerde<>(SourceObject.class);
Serde<AgrregatedObject> AgrregatedObjectSerde = new JsonSerde<>(AgrregatedObject.class);
return input -> input.map((key, value) -> new KeyValue<String, SourceObject>(value.uniques(), value))
.groupByKey(Grouped.with(Serdes.String(), SourceObjectSerde))
.aggregate(AgrregatedObject::new, (uniques, sourceObject,
destinationList) -> new SourceObjectUpdater().apply(sourceObject, destinationList),
Materialized.<String, AgrregatedObject>as(Stores.inMemoryKeyValueStore("custome-snapshots")).withKeySerde(Serdes.String()).withValueSerde(AgrregatedObjectSerde))
.toStream().foreach((foo, bar) -> process);
}
在运行此应用程序时,连同提供给processSourceObject的主题一起,它会自动创建另外两个主题
- processSourceObject-applicationId-data-snapshots-changelog
- processSourceObject-applicationId-data-snapshots-repartition
由于某些原因,我想使用现有主题而不是使用这两个主题。我在哪里进行更改以提供预定义主题的名称,以供我的应用程序用于更改日志和重新分区数据?