6

我正在我的开发 Windows 机器上编写一个 Kafka Streams 应用程序。如果我尝试使用 Kafka Streams 的leftJoinbranch功能,在执行 jar 应用程序时会出现以下错误:

Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll: Can't find dependent libraries
    at java.lang.ClassLoader$NativeLibrary.load(Native Method)
    at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
    at java.lang.Runtime.load0(Runtime.java:809)
    at java.lang.System.load(System.java:1086)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.Options.<clinit>(Options.java:22)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
    at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38)
    at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75)
    at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

似乎 Kafka 没有找到 DLL,但等等……我正在开发一个 Java 应用程序!

可能是什么问题呢?如果我尝试做更简单的流式操作,为什么这个错误不会显示出来filter

更新:

仅当代理中存在消息时才会出现此问题。我正在使用 Kafka Streams 版本 0.10.2.1。

这是引发问题的一段代码

public class KafkaStreamsMainClass {

    private KafkaStreamsMainClass() {
    }

    public static void main(final String[] args) throws Exception {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092");
        streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "schema-registry:8082");
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        KStreamBuilder builder = new KStreamBuilder();
        KStream<GenericRecord, GenericRecord> sourceStream = builder.stream(SOURCE_TOPIC);

        KStream<GenericRecord, GenericRecord> finishedFiltered = sourceStream
                .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") != null);

        KStream<GenericRecord, GenericRecord>[] branchedStreams = sourceStream
                .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") == null)
                .branch((GenericRecord key, GenericRecord value) -> value.get("firstField") != null,
                        (GenericRecord key, GenericRecord value) -> value.get("secondField") != null);

        branchedStreams[0] = finishedFiltered.join(branchedStreams[0],
                (GenericRecord value1, GenericRecord value2) -> {
                    return value1;
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));

        branchedStreams[1] = finishedFiltered.join(branchedStreams[1],
                (GenericRecord value1, GenericRecord value2) -> {
                    return value1;
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));

        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            throwable.printStackTrace();
        });
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}

我打开了rocksdbjni-5.0.1.jarMaven 下载的存档,其中包含该librocksdbjni-win64.dll库。似乎它试图从 RocksDB 外部而不是内部检索库。

我正在 Windows 7 机器上开发。

你有没有遇到过这个问题?

4

6 回答 6

12

最近我也遇到了这个问题。我设法通过两个步骤解决了这个问题:

  1. librocksdbjni[...].dll从文件夹中删除所有文件C:\Users\[your_user]\AppData\Local\Temp
  2. 在您的项目中添加 maven 依赖项rocksdb,这对我有用: https ://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/5.0.1

编译您的 Kafka Stream 应用程序并运行它。它应该工作!

于 2018-01-08T10:51:55.043 回答
2

我将我的 kafka-streams 项目更新为最新发布的 1.0.0 版本。

此版本存在错误,但在对其进行修补并将此修补版本上传到内部 Artifactory 服务器后,我们能够在 Windows 和 Linux 上执行我们的 kafka-streams 代理。下一个版本 1.0.1 和 1.1.0 将修复此错误,因此一旦这些版本之一发布,我们将切换到它们而不是修补版本。

总而言之,Kafka 的家伙在 1.0.0 版本中解决了这个错误。

于 2018-01-19T11:38:14.430 回答
1

我的问题是/tmp/目录中的权限(CentOS)

Rockdb 使用

java.io.tmpdir 

系统属性在内部决定放置librocksdbjni文件的位置,通常是这样的/tmp/librocksdbjni2925599838907625983.so

通过在 kafka-streams 应用程序中设置具有适当权限的不同 tempdir 属性来解决。

System.setProperty("java.io.tmpdir", "/opt/kafka-streams/tmp");
于 2020-01-20T10:25:18.623 回答
0

您缺少rocksdb dll 所依赖的一些本机库。见https://github.com/facebook/rocksdb/issues/1302

于 2017-05-02T19:53:36.680 回答
0

我在使用 jdk 1.8 时遇到了同样的问题。当我将其更改为 jre 时,它​​得到了解决。

于 2020-05-20T07:48:12.173 回答
0

在 Mac 中遇到了类似的问题。根据此链接,https://github.com/facebook/rocksdb/issues/5064问题与安装在我的 Mac OS 版本(10.11.6)中的旧 libc 有关。

于 2020-08-06T12:01:48.500 回答