我在 java 中构建了以下 kafka 拓扑逻辑。
KStream<String, Event> stream = bldr.stream(topicName,Consumed.with(Serdes.String(), eventSerde()));
KGroupedStream<String, Event> aggregateByOrderID = stream.map((key, event) -> new KeyValue<>( event.getOrderId())).groupByKey(with(Serdes.String(), eventSerde()));
final KTable<String, StoreValue> store = aggregateByOrderID.aggregate(() -> new event("", 0.0, "", "", true), (key, value, aggregate) -> {
try {
aggregate.setQuantity(value.getQuantity());
aggregate.setFlag(Boolean.TRUE);
aggregate.setOrderId(key);
aggregate.setEventId(value.getId());
return aggregate;
} catch(NumberFormatException | NullPointerException ex) {
LOGGER.error("Error occured");
}
return aggregate;
}, Materialized.with(Serdes.String(), eventSerde()));
KGroupedStream<String, Store> aggregateAccountQuantityStore = store.toStream().map((key, value) -> new KeyValue<>(value.getAccountNumber(), value))
.groupByKey(with(Serdes.String(), eventSerde()));
final KTable<String, SellingQuantity> finalQuantity = aggregateAccountQuantityStore.aggregate(() -> new SellingQuantity("", 0.0, ""), (key, env, aggr) -> {
Double b = aggr.getQuantity() + env.getQuantity();
aggr.setQuantity(b);
aggr.setAccountNumber(env.getAccountNumber());
aggr.setEventId(env.getEventId());
return aggr;
}, Materialized.with(Serdes.String(), eventSerde()));
finalQuantity.toStream().map((k, event) -> new KeyValue<>(k, event)).to(anotherTopic, Produced.with(Serdes.String(), eventSerde()));
它通过 SASL 协议实现并使用 Kerberos 身份验证。
问题是每当我更改拓扑逻辑时,我都会遇到异常
无权访问主题:[KSTREAM-AGGREGATE-STATE-STORE-0000000009-repartition, KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition]
我们不确定如何缓解此问题或如何永久解决此问题。
注意:在本地设置中,我们没有实施安全协议,它不会导致任何问题。