我们的 kafka 流应用程序会不断打开新的文件描述符,只要它们是新传入的消息,而不会关闭旧的。它最终导致异常。我们已将开放 fds 的限制提高到 65k,但似乎没有帮助。
Kafka 代理和 Kafka 流库都是 2.1 版
日志中不断出现的错误消息是:
org.apache.kafka.streams.processor.internals.StreamThread.run StreamThread.java:747 org.apache.kafka.streams.processor.internals.StreamThread.runLoop StreamThread.java:777 org.apache.kafka.streams.processor。 internals.StreamThread.runOnce StreamThread.java:883 org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit StreamThread.java:1029 org.apache.kafka.streams.processor.internals.TaskManager.commitAll TaskManager.java:405 org.apache.kafka.streams.processor.internals.AssignedTasks.commit AssignedTasks.java:346 org.apache.kafka.streams.processor.internals.StreamTask.commit StreamTask.java:431 org.apache.kafka.streams.processor。 internals.StreamTask.commit StreamTask.java:443 org.apache.kafka.streams.processor.internals.StreamTask.flushState StreamTask.java:491 org.apache.kafka.streams。processor.internals.AbstractTask.flushState AbstractTask.java: 204 org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush ProcessorStateManager.java: 217 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush MeteredKeyValueStore.java :226 org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush WrappedStateStore.java:85 org.apache.kafka.streams.state.internals.RocksDBStore.flush RocksDBStore.java:388 org.apache.kafka。 stream.state.internals.RocksDBStore.flushInternal RocksDBStore.java: 395 org.rocksdb.RocksDB.flush RocksDB.java: 1743 org.rocksdb.RocksDB.flush RocksDB.java org.rocksdb.RocksDBException: 打开文件追加时:/ tmp/kafka-streams/s4l-notifications-test/5_1/rocksdb/main-store/002052.sst:打开文件过多状态:#object [org.rocksdb。状态 0x1cca4c5c "org.rocksdb.Status@1cca4c5c"] org.apache.kafka.streams.errors.ProcessorStateException:从存储主存储执行刷新时出错
任何想法如何调试它?