0

我在 java 中构建了以下 kafka 拓扑逻辑。

        KStream<String, Event> stream = bldr.stream(topicName,Consumed.with(Serdes.String(), eventSerde()));

        KGroupedStream<String, Event> aggregateByOrderID = stream.map((key, event) -> new KeyValue<>( event.getOrderId())).groupByKey(with(Serdes.String(), eventSerde()));

        final KTable<String, StoreValue> store = aggregateByOrderID.aggregate(() -> new event("", 0.0, "", "", true), (key, value, aggregate) -> {
                    try {
                        aggregate.setQuantity(value.getQuantity());
                        aggregate.setFlag(Boolean.TRUE);
                        aggregate.setOrderId(key);
                        aggregate.setEventId(value.getId());
                        return aggregate;
                    } catch(NumberFormatException | NullPointerException ex) {
                        LOGGER.error("Error occured");
                    }
                    return aggregate;
                }, Materialized.with(Serdes.String(), eventSerde()));

        KGroupedStream<String, Store> aggregateAccountQuantityStore = store.toStream().map((key, value) -> new KeyValue<>(value.getAccountNumber(), value))
                .groupByKey(with(Serdes.String(), eventSerde()));

        final KTable<String, SellingQuantity> finalQuantity = aggregateAccountQuantityStore.aggregate(() -> new SellingQuantity("", 0.0, ""), (key, env, aggr) -> {
                        Double b = aggr.getQuantity() + env.getQuantity();
                        aggr.setQuantity(b);
                        aggr.setAccountNumber(env.getAccountNumber());
                        aggr.setEventId(env.getEventId());
                        return aggr;   
                }, Materialized.with(Serdes.String(), eventSerde()));
                
        finalQuantity.toStream().map((k, event) -> new KeyValue<>(k, event)).to(anotherTopic, Produced.with(Serdes.String(), eventSerde()));

它通过 SASL 协议实现并使用 Kerberos 身份验证。

问题是每当我更改拓扑逻辑时,我都会遇到异常

无权访问主题:[KSTREAM-AGGREGATE-STATE-STORE-0000000009-repartition, KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition]

我们不确定如何缓解此问题或如何永久解决此问题。

注意:在本地设置中,我们没有实施安全协议,它不会导致任何问题。

4

0 回答 0