1

我正在创建一个流应用程序来丰富来自存储在表中的主题的数据。在我将物化为 api 添加之前,一切都运行良好。有人知道为什么会这样吗?我已经在网上搜索了解释这一点的资源。

它始终使用新的 kafka 实例运行。

val source = builder.stream<String, Example>(INPUT)

val newValues = source
    .mapValues { key, value ->
        NewValue()
    }
    .toTable(Materialized.`as`(STORE_NAME))

如果我删除它,代码工作正常。

它抛出以下堆栈跟踪并终止流:

java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_1
at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$1311/000000000000000000.applyAsLong(Unknown Source)
at java.base/java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
at java.base/java.util.Comparator$$Lambda$1312/000000000000000000.compare(Unknown Source)
at java.base/java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216)
at java.base/java.util.Comparator$$Lambda$178/000000000000000000.compare(Unknown Source)
at java.base/java.util.TreeMap.put(TreeMap.java:550)
at java.base/java.util.TreeSet.add(TreeSet.java:255)
at java.base/java.util.AbstractCollection.addAll(AbstractCollection.java:352)
at java.base/java.util.TreeSet.addAll(TreeSet.java:312)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1265)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1179)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:930)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:394)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:590)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1301)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

编辑:原来状态存储在测试容器测试的运行之间。所以使用KafkaStreams#cleanup方法来解决这个流状态冲突的问题。

4

0 回答 0